package org.apache.doris.flink.tools.cdc;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/tools/cdc/DatabaseSync.class */
public abstract class DatabaseSync {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DatabaseSync.class);
    private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
    protected Configuration config;
    protected String database;
    protected TableNameConverter converter;
    protected Pattern includingPattern;
    protected Pattern excludingPattern;
    protected Map<String, String> tableConfig;
    protected Configuration sinkConfig;
    public StreamExecutionEnvironment env;

    /* loaded from: input_file:org/apache/doris/flink/tools/cdc/DatabaseSync$TableNameConverter.class */
    public static class TableNameConverter implements Serializable {
        private static final long serialVersionUID = 1;
        private final String prefix;
        private final String suffix;

        TableNameConverter() {
            this("", "");
        }

        TableNameConverter(String str, String str2) {
            this.prefix = str == null ? "" : str;
            this.suffix = str2 == null ? "" : str2;
        }

        public String convert(String str) {
            return this.prefix + str + this.suffix;
        }
    }

    public abstract Connection getConnection() throws SQLException;

    public abstract List<SourceSchema> getSchemaList() throws Exception;

    public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment streamExecutionEnvironment);

    public void create(StreamExecutionEnvironment streamExecutionEnvironment, String str, Configuration configuration, String str2, String str3, String str4, String str5, Configuration configuration2, Map<String, String> map) {
        this.env = streamExecutionEnvironment;
        this.config = configuration;
        this.database = str;
        this.converter = new TableNameConverter(str2, str3);
        this.includingPattern = str4 == null ? null : Pattern.compile(str4);
        this.excludingPattern = str5 == null ? null : Pattern.compile(str5);
        this.sinkConfig = configuration2;
        this.tableConfig = map == null ? new HashMap<>() : map;
        if (this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
            return;
        }
        this.tableConfig.put(LIGHT_SCHEMA_CHANGE, BooleanUtils.TRUE);
    }

    public void build() throws Exception {
        DorisSystem dorisSystem = new DorisSystem(getDorisConnectionOptions());
        List<SourceSchema> schemaList = getSchemaList();
        if (!dorisSystem.databaseExists(this.database)) {
            LOG.info("database {} not exist, created", this.database);
            dorisSystem.createDatabase(this.database);
        }
        ArrayList arrayList = new ArrayList();
        ArrayList<String> arrayList2 = new ArrayList();
        for (SourceSchema sourceSchema : schemaList) {
            arrayList.add(sourceSchema.getTableName());
            String convert = this.converter.convert(sourceSchema.getTableName());
            if (!dorisSystem.tableExists(this.database, convert)) {
                TableSchema convertTableSchema = sourceSchema.convertTableSchema(this.tableConfig);
                convertTableSchema.setDatabase(this.database);
                convertTableSchema.setTable(convert);
                dorisSystem.createTable(convertTableSchema);
            }
            arrayList2.add(convert);
        }
        Preconditions.checkState(!arrayList.isEmpty(), "No tables to be synchronized.");
        this.config.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", arrayList) + ")");
        SingleOutputStreamOperator process = buildCdcSource(this.env).process(new ParsingProcessFunction(this.converter));
        for (String str : arrayList2) {
            DataStream sideOutput = process.getSideOutput(ParsingProcessFunction.createRecordOutputTag(str));
            sideOutput.sinkTo(buildDorisSink(str)).setParallelism(this.sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism())).name(str);
        }
    }

    private DorisConnectionOptions getDorisConnectionOptions() {
        String string = this.sinkConfig.getString(DorisConfigOptions.FENODES);
        String string2 = this.sinkConfig.getString(DorisConfigOptions.USERNAME);
        String string3 = this.sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
        String string4 = this.sinkConfig.getString(DorisConfigOptions.JDBC_URL);
        Preconditions.checkNotNull(string, "fenodes is empty in sink-conf");
        Preconditions.checkNotNull(string2, "username is empty in sink-conf");
        Preconditions.checkNotNull(string4, "jdbcurl is empty in sink-conf");
        return new DorisConnectionOptions.DorisConnectionOptionsBuilder().withFenodes(string).withUsername(string2).withPassword(string3).withJdbcUrl(string4).build();
    }

    public DorisSink<String> buildDorisSink(String str) {
        String string = this.sinkConfig.getString(DorisConfigOptions.FENODES);
        String string2 = this.sinkConfig.getString(DorisConfigOptions.USERNAME);
        String string3 = this.sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
        String string4 = this.sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
        DorisSink.Builder builder = DorisSink.builder();
        DorisOptions.Builder builder2 = DorisOptions.builder();
        builder2.setFenodes(string).setTableIdentifier(this.database + "." + str).setUsername(string2).setPassword(string3);
        Properties properties = new Properties();
        properties.setProperty(LoadConstants.FORMAT_KEY, LoadConstants.JSON);
        properties.setProperty("read_json_by_line", BooleanUtils.TRUE);
        properties.putAll(DorisConfigOptions.getStreamLoadProp(this.sinkConfig.toMap()));
        DorisExecutionOptions.Builder streamLoadProp = DorisExecutionOptions.builder().setLabelPrefix(String.join("-", string4, this.database, str)).setStreamLoadProp(properties);
        Optional optional = this.sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE);
        streamLoadProp.getClass();
        optional.ifPresent(streamLoadProp::setDeletable);
        Optional optional2 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT);
        streamLoadProp.getClass();
        optional2.ifPresent((v1) -> {
            r1.setBufferCount(v1);
        });
        Optional optional3 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE);
        streamLoadProp.getClass();
        optional3.ifPresent((v1) -> {
            r1.setBufferSize(v1);
        });
        Optional optional4 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL);
        streamLoadProp.getClass();
        optional4.ifPresent(streamLoadProp::setCheckInterval);
        Optional optional5 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES);
        streamLoadProp.getClass();
        optional5.ifPresent(streamLoadProp::setMaxRetries);
        if (!this.sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) {
            streamLoadProp.disable2PC();
        }
        builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(streamLoadProp.build()).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(builder2.build()).build()).setDorisOptions(builder2.build());
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isSyncNeeded(String str) {
        boolean z = true;
        if (this.includingPattern != null) {
            z = this.includingPattern.matcher(str).matches();
        }
        if (this.excludingPattern != null) {
            z = z && !this.excludingPattern.matcher(str).matches();
        }
        LOG.debug("table {} is synchronized? {}", str, Boolean.valueOf(z));
        return z;
    }
}
