package org.apache.doris.spark.rest;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.shaded.org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/doris/spark/rest/RestService.class */
public class RestService implements Serializable {
    public static final int REST_RESPONSE_STATUS_OK = 200;
    private static final String API_PREFIX = "/api";
    private static final String SCHEMA = "_schema";
    private static final String QUERY_PLAN = "_query_plan";

    @Deprecated
    private static final String BACKENDS = "/rest/v1/system?path=//backends";
    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";

    private static String send(Settings settings, HttpRequestBase httpRequestBase, Logger logger) throws ConnectedFailedException {
        String connectionGet;
        int integerProperty = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS, 30000);
        int integerProperty2 = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS, 30000);
        int integerProperty3 = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, 3);
        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", new Object[]{Integer.valueOf(integerProperty), Integer.valueOf(integerProperty2), Integer.valueOf(integerProperty3)});
        httpRequestBase.setConfig(RequestConfig.custom().setConnectTimeout(integerProperty).setSocketTimeout(integerProperty2).build());
        String property = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER, "");
        String property2 = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, "");
        logger.info("Send request to Doris FE '{}' with user '{}'.", httpRequestBase.getURI(), property);
        IOException iOException = null;
        for (int i = 0; i < integerProperty3; i++) {
            logger.debug("Attempt {} to request {}.", Integer.valueOf(i), httpRequestBase.getURI());
            try {
                connectionGet = httpRequestBase instanceof HttpGet ? getConnectionGet(httpRequestBase.getURI().toString(), property, property2, logger) : getConnectionPost(httpRequestBase, property, property2, logger);
            } catch (IOException e) {
                iOException = e;
                logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, httpRequestBase.getURI(), e);
            }
            if (connectionGet != null) {
                logger.trace("Success get response from Doris FE: {}, response is: {}.", httpRequestBase.getURI(), connectionGet);
                ObjectMapper objectMapper = new ObjectMapper();
                Map map = (Map) objectMapper.readValue(connectionGet, Map.class);
                return (map.containsKey("code") && map.containsKey("msg")) ? objectMapper.writeValueAsString(map.get("data")) : connectionGet;
            }
            logger.warn("Failed to get response from Doris FE {}, http code is {}", httpRequestBase.getURI(), -1);
        }
        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, httpRequestBase.getURI(), iOException);
        throw new ConnectedFailedException(httpRequestBase.getURI().toString(), -1, iOException);
    }

    private static String getConnectionGet(String str, String str2, String str3, Logger logger) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
        httpURLConnection.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString(String.format("%s:%s", str2, str3).getBytes(StandardCharsets.UTF_8)));
        httpURLConnection.connect();
        return parseResponse(httpURLConnection, logger);
    }

    private static String parseResponse(HttpURLConnection httpURLConnection, Logger logger) throws IOException {
        if (httpURLConnection.getResponseCode() != 200) {
            logger.warn("Failed to get response from Doris  {}, http code is {}", httpURLConnection.getURL(), Integer.valueOf(httpURLConnection.getResponseCode()));
            throw new IOException("Failed to get response from Doris");
        }
        StringBuilder sb = new StringBuilder("");
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), "utf-8"));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                break;
            }
            sb.append(readLine);
        }
        if (bufferedReader != null) {
            bufferedReader.close();
        }
        return sb.toString();
    }

    private static String getConnectionPost(HttpRequestBase httpRequestBase, String str, String str2, Logger logger) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(httpRequestBase.getURI().toString()).openConnection();
        httpURLConnection.setInstanceFollowRedirects(false);
        httpURLConnection.setRequestMethod(httpRequestBase.getMethod());
        httpURLConnection.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString(String.format("%s:%s", str, str2).getBytes(StandardCharsets.UTF_8)));
        String iOUtils = IOUtils.toString(((HttpPost) httpRequestBase).getEntity().getContent());
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setDoInput(true);
        PrintWriter printWriter = new PrintWriter(httpURLConnection.getOutputStream());
        printWriter.print(iOUtils);
        printWriter.flush();
        return parseResponse(httpURLConnection, logger);
    }

    @VisibleForTesting
    static String[] parseIdentifier(String str, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse identifier '{}'.", str);
        if (StringUtils.isEmpty(str)) {
            logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, ConfigurationOptions.TABLE_IDENTIFIER, str);
            throw new IllegalArgumentException(ConfigurationOptions.TABLE_IDENTIFIER, str);
        }
        String[] split = str.split("\\.");
        if (split.length == 2) {
            return split;
        }
        logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, ConfigurationOptions.TABLE_IDENTIFIER, str);
        throw new IllegalArgumentException(ConfigurationOptions.TABLE_IDENTIFIER, str);
    }

    @VisibleForTesting
    static String randomEndpoint(String str, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse fenodes '{}'.", str);
        if (StringUtils.isEmpty(str)) {
            logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", str);
            throw new IllegalArgumentException("fenodes", str);
        }
        List asList = Arrays.asList(str.split(","));
        Collections.shuffle(asList);
        return ((String) asList.get(0)).trim();
    }

    @VisibleForTesting
    static String getUriStr(Settings settings, Logger logger) throws IllegalArgumentException {
        String[] parseIdentifier = parseIdentifier(settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER), logger);
        return "http://" + randomEndpoint(settings.getProperty(ConfigurationOptions.DORIS_FENODES), logger) + API_PREFIX + "/" + parseIdentifier[0] + "/" + parseIdentifier[1] + "/";
    }

    public static Schema getSchema(Settings settings, Logger logger) throws DorisException {
        logger.trace("Finding schema.");
        String send = send(settings, new HttpGet(getUriStr(settings, logger) + SCHEMA), logger);
        logger.debug("Find schema response is '{}'.", send);
        return parseSchema(send, logger);
    }

    @VisibleForTesting
    public static Schema parseSchema(String str, Logger logger) throws DorisException {
        logger.trace("Parse response '{}' to schema.", str);
        try {
            Schema schema = (Schema) new ObjectMapper().readValue(str, Schema.class);
            if (schema == null) {
                logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
                throw new ShouldNeverHappenException();
            }
            if (schema.getStatus() == 200) {
                logger.debug("Parsing schema result is '{}'.", schema);
                return schema;
            }
            String str2 = "Doris FE's response is not OK, status is " + schema.getStatus();
            logger.error(str2);
            throw new DorisException(str2);
        } catch (IOException e) {
            String str3 = "Parse Doris FE's response to json failed. res: " + str;
            logger.error(str3, e);
            throw new DorisException(str3, e);
        } catch (JsonMappingException e2) {
            String str4 = "Doris FE's response cannot map to schema. res: " + str;
            logger.error(str4, e2);
            throw new DorisException(str4, e2);
        } catch (JsonParseException e3) {
            String str5 = "Doris FE's response is not a json. res: " + str;
            logger.error(str5, e3);
            throw new DorisException(str5, e3);
        }
    }

    public static List<PartitionDefinition> findPartitions(Settings settings, Logger logger) throws DorisException {
        String[] parseIdentifier = parseIdentifier(settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER), logger);
        String str = "select " + settings.getProperty(ConfigurationOptions.DORIS_READ_FIELD, "*") + " from `" + parseIdentifier[0] + "`.`" + parseIdentifier[1] + "`";
        if (!StringUtils.isEmpty(settings.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))) {
            str = str + " where " + settings.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY);
        }
        logger.debug("Query SQL Sending to Doris FE is: '{}'.", str);
        HttpPost httpPost = new HttpPost(getUriStr(settings, logger) + QUERY_PLAN);
        String str2 = "{\"sql\": \"" + str + "\"}";
        logger.debug("Post body Sending to Doris FE is: '{}'.", str2);
        StringEntity stringEntity = new StringEntity(str2, StandardCharsets.UTF_8);
        stringEntity.setContentEncoding("UTF-8");
        stringEntity.setContentType("application/json");
        httpPost.setEntity(stringEntity);
        String send = send(settings, httpPost, logger);
        logger.debug("Find partition response is '{}'.", send);
        QueryPlan queryPlan = getQueryPlan(send, logger);
        return tabletsMapToPartition(settings, selectBeForTablet(queryPlan, logger), queryPlan.getOpaqued_query_plan(), parseIdentifier[0], parseIdentifier[1], logger);
    }

    @VisibleForTesting
    static QueryPlan getQueryPlan(String str, Logger logger) throws DorisException {
        try {
            QueryPlan queryPlan = (QueryPlan) new ObjectMapper().readValue(str, QueryPlan.class);
            if (queryPlan == null) {
                logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
                throw new ShouldNeverHappenException();
            }
            if (queryPlan.getStatus() == 200) {
                logger.debug("Parsing partition result is '{}'.", queryPlan);
                return queryPlan;
            }
            String str2 = "Doris FE's response is not OK, status is " + queryPlan.getStatus();
            logger.error(str2);
            throw new DorisException(str2);
        } catch (JsonMappingException e) {
            String str3 = "Doris FE's response cannot map to schema. res: " + str;
            logger.error(str3, e);
            throw new DorisException(str3, e);
        } catch (IOException e2) {
            String str4 = "Parse Doris FE's response to json failed. res: " + str;
            logger.error(str4, e2);
            throw new DorisException(str4, e2);
        } catch (JsonParseException e3) {
            String str5 = "Doris FE's response is not a json. res: " + str;
            logger.error(str5, e3);
            throw new DorisException(str5, e3);
        }
    }

    @VisibleForTesting
    static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Tablet> entry : queryPlan.getPartitions().entrySet()) {
            logger.debug("Parse tablet info: '{}'.", entry);
            try {
                long parseLong = Long.parseLong(entry.getKey());
                String str = null;
                int i = Integer.MAX_VALUE;
                Iterator<String> it = entry.getValue().getRoutings().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    String next = it.next();
                    logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", next, Long.valueOf(parseLong));
                    if (!hashMap.containsKey(next)) {
                        logger.debug("Choice a new Doris BE '{}' for tablet '{}'.", next, Long.valueOf(parseLong));
                        hashMap.put(next, new ArrayList());
                        str = next;
                        break;
                    }
                    if (((List) hashMap.get(next)).size() < i) {
                        str = next;
                        i = ((List) hashMap.get(next)).size();
                        logger.debug("Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.", new Object[]{Long.valueOf(parseLong), str, Integer.valueOf(i)});
                    }
                }
                if (str == null) {
                    String str2 = "Cannot choice Doris BE for tablet " + parseLong;
                    logger.error(str2);
                    throw new DorisException(str2);
                }
                logger.debug("Choice Doris BE '{}' for tablet '{}'.", str, Long.valueOf(parseLong));
                ((List) hashMap.get(str)).add(Long.valueOf(parseLong));
            } catch (NumberFormatException e) {
                String str3 = "Parse tablet id '" + entry.getKey() + "' to long failed.";
                logger.error(str3, e);
                throw new DorisException(str3, e);
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    static int tabletCountLimitForOnePartition(Settings settings, Logger logger) {
        int i = Integer.MAX_VALUE;
        if (settings.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE) != null) {
            try {
                i = Integer.parseInt(settings.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE));
            } catch (NumberFormatException e) {
                logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, ConfigurationOptions.DORIS_TABLET_SIZE, settings.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE));
            }
        }
        if (i < 1) {
            logger.warn("{} is less than {}, set to default value {}.", new Object[]{ConfigurationOptions.DORIS_TABLET_SIZE, 1, 1});
            i = 1;
        }
        logger.debug("Tablet size is set to {}.", Integer.valueOf(i));
        return i;
    }

    @VisibleForTesting
    @Deprecated
    public static String randomBackend(SparkSettings sparkSettings, Logger logger) throws DorisException, IOException {
        String send = send(sparkSettings, new HttpGet(String.format("http://%s/rest/v1/system?path=//backends", randomEndpoint(sparkSettings.getProperty(ConfigurationOptions.DORIS_FENODES), logger))), logger);
        logger.info("Backend Info:{}", send);
        List<BackendRow> parseBackend = parseBackend(send, logger);
        logger.trace("Parse beNodes '{}'.", parseBackend);
        if (parseBackend == null || parseBackend.isEmpty()) {
            logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "beNodes", parseBackend);
            throw new IllegalArgumentException("beNodes", String.valueOf(parseBackend));
        }
        Collections.shuffle(parseBackend);
        BackendRow backendRow = parseBackend.get(0);
        return backendRow.getIP() + TMultiplexedProtocol.SEPARATOR + backendRow.getHttpPort();
    }

    @VisibleForTesting
    @Deprecated
    static List<BackendRow> parseBackend(String str, Logger logger) throws DorisException, IOException {
        try {
            Backend backend = (Backend) new org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper().readValue(str, Backend.class);
            if (backend == null) {
                logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
                throw new ShouldNeverHappenException();
            }
            List<BackendRow> list = (List) backend.getRows().stream().filter(backendRow -> {
                return backendRow.getAlive().booleanValue();
            }).collect(Collectors.toList());
            logger.debug("Parsing schema result is '{}'.", list);
            return list;
        } catch (org.apache.doris.shaded.com.fasterxml.jackson.core.JsonParseException e) {
            String str2 = "Doris BE's response is not a json. res: " + str;
            logger.error(str2, e);
            throw new DorisException(str2, e);
        } catch (org.apache.doris.shaded.com.fasterxml.jackson.databind.JsonMappingException e2) {
            String str3 = "Doris BE's response cannot map to schema. res: " + str;
            logger.error(str3, e2);
            throw new DorisException(str3, e2);
        } catch (IOException e3) {
            String str4 = "Parse Doris BE's response to json failed. res: " + str;
            logger.error(str4, e3);
            throw new DorisException(str4, e3);
        }
    }

    @VisibleForTesting
    public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
        String send = send(sparkSettings, new HttpGet(String.format("http://%s/api/backends?is_alive=true", randomEndpoint(sparkSettings.getProperty(ConfigurationOptions.DORIS_FENODES), logger))), logger);
        logger.info("Backend Info:{}", send);
        List<BackendV2.BackendRowV2> parseBackendV2 = parseBackendV2(send, logger);
        logger.trace("Parse beNodes '{}'.", parseBackendV2);
        if (parseBackendV2 == null || parseBackendV2.isEmpty()) {
            logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "beNodes", parseBackendV2);
            throw new IllegalArgumentException("beNodes", String.valueOf(parseBackendV2));
        }
        Collections.shuffle(parseBackendV2);
        BackendV2.BackendRowV2 backendRowV2 = parseBackendV2.get(0);
        return backendRowV2.getIp() + TMultiplexedProtocol.SEPARATOR + backendRowV2.getHttpPort();
    }

    static List<BackendV2.BackendRowV2> parseBackendV2(String str, Logger logger) throws DorisException {
        try {
            BackendV2 backendV2 = (BackendV2) new org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper().readValue(str, BackendV2.class);
            if (backendV2 == null) {
                logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
                throw new ShouldNeverHappenException();
            }
            List<BackendV2.BackendRowV2> backends = backendV2.getBackends();
            logger.debug("Parsing schema result is '{}'.", backends);
            return backends;
        } catch (org.apache.doris.shaded.com.fasterxml.jackson.core.JsonParseException e) {
            String str2 = "Doris BE's response is not a json. res: " + str;
            logger.error(str2, e);
            throw new DorisException(str2, e);
        } catch (org.apache.doris.shaded.com.fasterxml.jackson.databind.JsonMappingException e2) {
            String str3 = "Doris BE's response cannot map to schema. res: " + str;
            logger.error(str3, e2);
            throw new DorisException(str3, e2);
        } catch (IOException e3) {
            String str4 = "Parse Doris BE's response to json failed. res: " + str;
            logger.error(str4, e3);
            throw new DorisException(str4, e3);
        }
    }

    @VisibleForTesting
    static List<PartitionDefinition> tabletsMapToPartition(Settings settings, Map<String, List<Long>> map, String str, String str2, String str3, Logger logger) throws IllegalArgumentException {
        int tabletCountLimitForOnePartition = tabletCountLimitForOnePartition(settings, logger);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<Long>> entry : map.entrySet()) {
            logger.debug("Generate partition with beInfo: '{}'.", entry);
            HashSet hashSet = new HashSet(entry.getValue());
            entry.getValue().clear();
            entry.getValue().addAll(hashSet);
            int i = 0;
            while (i < entry.getValue().size()) {
                HashSet hashSet2 = new HashSet(entry.getValue().subList(i, Math.min(entry.getValue().size(), i + tabletCountLimitForOnePartition)));
                i += tabletCountLimitForOnePartition;
                PartitionDefinition partitionDefinition = new PartitionDefinition(str2, str3, settings, entry.getKey(), hashSet2, str);
                logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition);
                arrayList.add(partitionDefinition);
            }
        }
        return arrayList;
    }
}
