package org.apache.doris.flink.sink.writer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.doris.shaded.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64;
import org.apache.doris.shaded.org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.class */
public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
    private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
    private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
    private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
    private static final String OP_READ = "r";
    private static final String OP_CREATE = "c";
    private static final String OP_UPDATE = "u";
    private static final String OP_DELETE = "d";
    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
    private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
    private final Pattern addDropDDLPattern;
    private DorisOptions dorisOptions;
    private ObjectMapper objectMapper = new ObjectMapper();
    private String database;
    private String table;
    private String sourceTableName;

    /* loaded from: input_file:org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer$Builder.class */
    public static class Builder {
        private DorisOptions dorisOptions;
        private Pattern addDropDDLPattern;
        private String sourceTableName;

        public Builder setDorisOptions(DorisOptions dorisOptions) {
            this.dorisOptions = dorisOptions;
            return this;
        }

        public Builder setPattern(Pattern pattern) {
            this.addDropDDLPattern = pattern;
            return this;
        }

        public Builder setSourceTableName(String str) {
            this.sourceTableName = str;
            return this;
        }

        public JsonDebeziumSchemaSerializer build() {
            return new JsonDebeziumSchemaSerializer(this.dorisOptions, this.addDropDDLPattern, this.sourceTableName);
        }
    }

    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String str) {
        this.dorisOptions = dorisOptions;
        this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, 2) : pattern;
        String[] split = dorisOptions.getTableIdentifier().split("\\.");
        this.database = split[0];
        this.table = split[1];
        this.sourceTableName = str;
    }

    @Override // org.apache.doris.flink.sink.writer.DorisRecordSerializer
    public byte[] serialize(String str) throws IOException {
        Map<String, String> extractAfterRow;
        LOG.debug("received debezium json data {} :", str);
        JsonNode readTree = this.objectMapper.readTree(str);
        String extractJsonNode = extractJsonNode(readTree, "op");
        if (Objects.isNull(extractJsonNode)) {
            schemaChange(readTree);
            return null;
        }
        if (OP_READ.equals(extractJsonNode) || OP_CREATE.equals(extractJsonNode)) {
            extractAfterRow = extractAfterRow(readTree);
            addDeleteSign(extractAfterRow, false);
        } else if (OP_UPDATE.equals(extractJsonNode)) {
            extractAfterRow = extractAfterRow(readTree);
            addDeleteSign(extractAfterRow, false);
        } else {
            if (!OP_DELETE.equals(extractJsonNode)) {
                LOG.error("parse record fail, unknown op {} in {}", extractJsonNode, str);
                return null;
            }
            extractAfterRow = extractBeforeRow(readTree);
            addDeleteSign(extractAfterRow, true);
        }
        return this.objectMapper.writeValueAsString(extractAfterRow).getBytes(StandardCharsets.UTF_8);
    }

    @VisibleForTesting
    public boolean schemaChange(JsonNode jsonNode) {
        boolean z = false;
        try {
        } catch (Exception e) {
            LOG.warn("schema change error :", e);
        }
        if (!StringUtils.isNullOrWhitespaceOnly(this.sourceTableName) && !checkTable(jsonNode)) {
            return false;
        }
        String extractDDL = extractDDL(jsonNode);
        if (StringUtils.isNullOrWhitespaceOnly(extractDDL)) {
            LOG.info("ddl can not do schema change:{}", jsonNode);
            return false;
        }
        z = checkSchemaChange(extractDDL) && execSchemaChange(extractDDL);
        LOG.info("schema change status:{}", Boolean.valueOf(z));
        return z;
    }

    protected boolean checkTable(JsonNode jsonNode) {
        return this.sourceTableName.equals(extractDatabase(jsonNode) + "." + extractTable(jsonNode));
    }

    private void addDeleteSign(Map<String, String> map, boolean z) {
        if (z) {
            map.put(LoadConstants.DORIS_DELETE_SIGN, "1");
        } else {
            map.put(LoadConstants.DORIS_DELETE_SIGN, "0");
        }
    }

    private boolean checkSchemaChange(String str) throws IOException {
        String format = String.format(CHECK_SCHEMA_CHANGE_API, this.dorisOptions.getFenodes(), this.database, this.table);
        Map<String, Object> buildRequestParam = buildRequestParam(str);
        if (buildRequestParam.size() != 2) {
            return false;
        }
        HttpGetWithEntity httpGetWithEntity = new HttpGetWithEntity(format);
        httpGetWithEntity.setHeader("Authorization", authHeader());
        httpGetWithEntity.setEntity(new StringEntity(this.objectMapper.writeValueAsString(buildRequestParam)));
        boolean handleResponse = handleResponse(httpGetWithEntity);
        if (!handleResponse) {
            LOG.warn("schema change can not do table {}.{}", this.database, this.table);
        }
        return handleResponse;
    }

    protected Map<String, Object> buildRequestParam(String str) {
        HashMap hashMap = new HashMap();
        Matcher matcher = this.addDropDDLPattern.matcher(str);
        if (matcher.find()) {
            String group = matcher.group(1);
            String group2 = matcher.group(3);
            hashMap.put("isDropColumn", Boolean.valueOf(group.equalsIgnoreCase("DROP")));
            hashMap.put("columnName", group2);
        }
        return hashMap;
    }

    private boolean execSchemaChange(String str) throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("stmt", str);
        HttpPost httpPost = new HttpPost(String.format(SCHEMA_CHANGE_API, this.dorisOptions.getFenodes(), this.database));
        httpPost.setHeader("Authorization", authHeader());
        httpPost.setHeader("Content-Type", "application/json");
        httpPost.setEntity(new StringEntity(this.objectMapper.writeValueAsString(hashMap)));
        return handleResponse(httpPost);
    }

    protected String extractDatabase(JsonNode jsonNode) {
        return jsonNode.get("source").has("schema") ? extractJsonNode(jsonNode.get("source"), "schema") : extractJsonNode(jsonNode.get("source"), "db");
    }

    protected String extractTable(JsonNode jsonNode) {
        return extractJsonNode(jsonNode.get("source"), "table");
    }

    private boolean handleResponse(HttpUriRequest httpUriRequest) {
        try {
            CloseableHttpClient createDefault = HttpClients.createDefault();
            Throwable th = null;
            try {
                try {
                    CloseableHttpResponse execute = createDefault.execute(httpUriRequest);
                    if (execute.getStatusLine().getStatusCode() == 200 && execute.getEntity() != null) {
                        String entityUtils = EntityUtils.toString(execute.getEntity());
                        if (((Map) this.objectMapper.readValue(entityUtils, Map.class)).getOrDefault("code", "-1").toString().equals("0")) {
                            if (createDefault != null) {
                                if (0 != 0) {
                                    try {
                                        createDefault.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    createDefault.close();
                                }
                            }
                            return true;
                        }
                        LOG.error("schema change response:{}", entityUtils);
                    }
                    if (createDefault != null) {
                        if (0 != 0) {
                            try {
                                createDefault.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            createDefault.close();
                        }
                    }
                    return false;
                } finally {
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Exception e) {
            LOG.error("http request error,", e);
            return false;
        }
    }

    private String extractJsonNode(JsonNode jsonNode, String str) {
        if (jsonNode == null || jsonNode.get(str) == null) {
            return null;
        }
        return jsonNode.get(str).asText();
    }

    private Map<String, String> extractBeforeRow(JsonNode jsonNode) {
        return extractRow(jsonNode.get("before"));
    }

    private Map<String, String> extractAfterRow(JsonNode jsonNode) {
        return extractRow(jsonNode.get("after"));
    }

    private Map<String, String> extractRow(JsonNode jsonNode) {
        Map<String, String> map = (Map) this.objectMapper.convertValue(jsonNode, new TypeReference<Map<String, String>>() { // from class: org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer.1
        });
        return map != null ? map : new HashMap();
    }

    public String extractDDL(JsonNode jsonNode) throws JsonProcessingException {
        String extractJsonNode = extractJsonNode(jsonNode, "historyRecord");
        if (Objects.isNull(extractJsonNode)) {
            return null;
        }
        String extractJsonNode2 = extractJsonNode(this.objectMapper.readTree(extractJsonNode), "ddl");
        LOG.debug("received debezium ddl :{}", extractJsonNode2);
        if (Objects.isNull(extractJsonNode2)) {
            return null;
        }
        Matcher matcher = this.addDropDDLPattern.matcher(extractJsonNode2);
        if (!matcher.find()) {
            return null;
        }
        String group = matcher.group(1);
        String group2 = matcher.group(3);
        String group3 = matcher.group(5);
        String format = String.format(EXECUTE_DDL, this.dorisOptions.getTableIdentifier(), group, group2, group3 == null ? "" : group3);
        LOG.info("parse ddl:{}", format);
        return format;
    }

    private String authHeader() {
        return "Basic " + new String(Base64.encodeBase64((this.dorisOptions.getUsername() + TMultiplexedProtocol.SEPARATOR + this.dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
    }

    public static Builder builder() {
        return new Builder();
    }
}
