package org.apache.doris.spark;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;
import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.RespContent;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.protocol.HTTP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/spark/DorisStreamLoad.class */
public class DorisStreamLoad implements Serializable {
    public static final String FIELD_DELIMITER = "\t";
    public static final String LINE_DELIMITER = "\n";
    public static final String NULL_VALUE = "\\N";
    private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList(Arrays.asList("Success", "Publish Timeout"));
    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    private String user;
    private String passwd;
    private String loadUrlStr;
    private String hostPort;
    private String db;
    private String tbl;
    private String authEncoding;
    private String columns;
    private String[] dfColumns;

    /* loaded from: input_file:org/apache/doris/spark/DorisStreamLoad$LoadResponse.class */
    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int i, String str, String str2) {
            this.status = i;
            this.respMsg = str;
            this.respContent = str2;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("status: ").append(this.status);
            sb.append(", resp msg: ").append(this.respMsg);
            sb.append(", resp content: ").append(this.respContent);
            return sb.toString();
        }
    }

    public DorisStreamLoad(String str, String str2, String str3, String str4, String str5) {
        this.hostPort = str;
        this.db = str2;
        this.tbl = str3;
        this.user = str4;
        this.passwd = str5;
        this.loadUrlStr = String.format(loadUrlPattern, str, str2, str3);
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", str4, str5).getBytes(StandardCharsets.UTF_8));
    }

    public DorisStreamLoad(SparkSettings sparkSettings) throws IOException, DorisException {
        String randomBackendV2 = RestService.randomBackendV2(sparkSettings, LOG);
        this.hostPort = randomBackendV2;
        String[] split = sparkSettings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
        this.db = split[0];
        this.tbl = split[1];
        this.user = sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
        this.passwd = sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
        this.loadUrlStr = String.format(loadUrlPattern, randomBackendV2, this.db, this.tbl);
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", this.user, this.passwd).getBytes(StandardCharsets.UTF_8));
        this.columns = sparkSettings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
    }

    public DorisStreamLoad(SparkSettings sparkSettings, String[] strArr) throws IOException, DorisException {
        String randomBackendV2 = RestService.randomBackendV2(sparkSettings, LOG);
        this.hostPort = randomBackendV2;
        String[] split = sparkSettings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
        this.db = split[0];
        this.tbl = split[1];
        this.user = sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
        this.passwd = sparkSettings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
        this.loadUrlStr = String.format(loadUrlPattern, randomBackendV2, this.db, this.tbl);
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", this.user, this.passwd).getBytes(StandardCharsets.UTF_8));
        this.columns = sparkSettings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
        this.dfColumns = strArr;
    }

    public String getLoadUrlStr() {
        return this.loadUrlStr;
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public void setHostPort(String str) {
        this.hostPort = str;
        this.loadUrlStr = String.format(loadUrlPattern, str, this.db, this.tbl);
    }

    private HttpURLConnection getConnection(String str, String str2) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
        httpURLConnection.setInstanceFollowRedirects(false);
        httpURLConnection.setRequestMethod(HttpPut.METHOD_NAME);
        httpURLConnection.setRequestProperty("Authorization", "Basic " + this.authEncoding);
        httpURLConnection.addRequestProperty("Expect", HTTP.EXPECT_CONTINUE);
        httpURLConnection.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        httpURLConnection.addRequestProperty("label", str2);
        if (this.columns != null && !this.columns.equals("")) {
            httpURLConnection.addRequestProperty("columns", this.columns);
        }
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setDoInput(true);
        httpURLConnection.addRequestProperty("format", "json");
        httpURLConnection.addRequestProperty("strip_outer_array", "true");
        return httpURLConnection;
    }

    public String listToString(List<List<Object>> list) {
        StringJoiner stringJoiner = new StringJoiner(LINE_DELIMITER);
        for (List<Object> list2 : list) {
            StringJoiner stringJoiner2 = new StringJoiner(FIELD_DELIMITER);
            for (Object obj : list2) {
                if (obj == null) {
                    stringJoiner2.add(NULL_VALUE);
                } else {
                    stringJoiner2.add(obj.toString());
                }
            }
            stringJoiner.add(stringJoiner2.toString());
        }
        return stringJoiner.toString();
    }

    public void loadV2(List<List<Object>> list) throws StreamLoadException, JsonProcessingException {
        ArrayList arrayList = new ArrayList();
        try {
            for (List<Object> list2 : list) {
                HashMap hashMap = new HashMap();
                if (this.dfColumns.length == list2.size()) {
                    for (int i = 0; i < this.dfColumns.length; i++) {
                        hashMap.put(this.dfColumns[i], list2.get(i));
                    }
                }
                arrayList.add(hashMap);
            }
            load(new ObjectMapper().writeValueAsString(arrayList));
        } catch (Exception e) {
            throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
        }
    }

    public void load(String str) throws StreamLoadException {
        LOG.debug("Streamload Request:{} ,Body:{}", this.loadUrlStr, str);
        LoadResponse loadBatch = loadBatch(str);
        if (loadBatch.status != 200) {
            throw new StreamLoadException("stream load error: " + loadBatch.respContent);
        }
        LOG.info("Streamload Response:{}", loadBatch);
        try {
            RespContent respContent = (RespContent) new ObjectMapper().readValue(loadBatch.respContent, RespContent.class);
            if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
            } else {
                throw new StreamLoadException("stream load error: " + respContent.getMessage());
            }
        } catch (IOException e) {
            throw new StreamLoadException(e);
        }
    }

    private LoadResponse loadBatch(String str) {
        Calendar calendar = Calendar.getInstance();
        String format = String.format("spark_streamload_%s%02d%02d_%02d%02d%02d_%s", Integer.valueOf(calendar.get(1)), Integer.valueOf(calendar.get(2) + 1), Integer.valueOf(calendar.get(5)), Integer.valueOf(calendar.get(11)), Integer.valueOf(calendar.get(12)), Integer.valueOf(calendar.get(13)), UUID.randomUUID().toString().replaceAll("-", ""));
        HttpURLConnection httpURLConnection = null;
        HttpURLConnection httpURLConnection2 = null;
        int i = -1;
        try {
            try {
                httpURLConnection2 = getConnection(this.loadUrlStr, format);
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(httpURLConnection2.getOutputStream());
                bufferedOutputStream.write(str.getBytes("UTF-8"));
                bufferedOutputStream.close();
                i = httpURLConnection2.getResponseCode();
                String responseMessage = httpURLConnection2.getResponseMessage();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader((InputStream) httpURLConnection2.getContent()));
                StringBuilder sb = new StringBuilder();
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    sb.append(readLine);
                }
                LoadResponse loadResponse = new LoadResponse(i, responseMessage, sb.toString());
                if (0 != 0) {
                    httpURLConnection.disconnect();
                }
                if (httpURLConnection2 != null) {
                    httpURLConnection2.disconnect();
                }
                return loadResponse;
            } catch (Exception e) {
                e.printStackTrace();
                String str2 = "http request exception,load url : " + this.loadUrlStr + ",failed to execute spark streamload with label: " + format;
                LOG.warn(str2, e);
                LoadResponse loadResponse2 = new LoadResponse(i, e.getMessage(), str2);
                if (0 != 0) {
                    httpURLConnection.disconnect();
                }
                if (httpURLConnection2 != null) {
                    httpURLConnection2.disconnect();
                }
                return loadResponse2;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                httpURLConnection.disconnect();
            }
            if (httpURLConnection2 != null) {
                httpURLConnection2.disconnect();
            }
            throw th;
        }
    }
}
