package org.apache.doris.flink.table;

import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.doris.flink.cfg.ConfigurationOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;

/* loaded from: input_file:org/apache/doris/flink/table/DorisDynamicTableFactory.class */
public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
    public static final ConfigOption<String> FENODES = ConfigOptions.key(ConfigurationOptions.DORIS_FENODES).stringType().noDefaultValue().withDescription("doris fe http address.");
    public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key(ConfigurationOptions.TABLE_IDENTIFIER).stringType().noDefaultValue().withDescription("the jdbc table name.");
    public static final ConfigOption<String> USERNAME = ConfigOptions.key(ConfigurationOptions.DORIS_USER).stringType().noDefaultValue().withDescription("the jdbc user name.");
    public static final ConfigOption<String> PASSWORD = ConfigOptions.key(ConfigurationOptions.DORIS_PASSWORD).stringType().noDefaultValue().withDescription("the jdbc password.");
    private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions.key(ConfigurationOptions.DORIS_READ_FIELD).stringType().noDefaultValue().withDescription("List of column names in the Doris table, separated by commas");
    private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions.key(ConfigurationOptions.DORIS_FILTER_QUERY).stringType().noDefaultValue().withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
    private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions.key(ConfigurationOptions.DORIS_TABLET_SIZE).intType().defaultValue(ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT).withDescription("");
    private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions.key(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS).intType().defaultValue(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT).withDescription("");
    private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions.key(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS).intType().defaultValue(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT).withDescription("");
    private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions.key(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S).intType().defaultValue(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT).withDescription("");
    private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions.key(ConfigurationOptions.DORIS_REQUEST_RETRIES).intType().defaultValue(ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT).withDescription("");
    private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions.key(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC).booleanType().defaultValue(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT).withDescription("");
    private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions.key("doris.request.retriesdoris.deserialize.queue.size").intType().defaultValue(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT).withDescription("");
    private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions.key(ConfigurationOptions.DORIS_BATCH_SIZE).intType().defaultValue(ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT).withDescription("");
    private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions.key(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT).longType().defaultValue(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT).withDescription("");
    private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions.key("sink.enable-2pc").booleanType().defaultValue(true).withDescription("enable 2PC while loading");
    private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions.key("sink.check-interval").intType().defaultValue(10000).withDescription("check exception with the interval while loading");
    private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("the max retry times if writing records to database failed.");
    private static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions.key("sink.buffer-size").intType().defaultValue(262144).withDescription("the buffer size to cache data for stream load.");
    private static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions.key("sink.buffer-count").intType().defaultValue(3).withDescription("the buffer count to cache data for stream load.");
    private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions.key("sink.label-prefix").stringType().defaultValue("").withDescription("the unique label prefix.");
    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.batch.interval").durationType().defaultValue(Duration.ofSeconds(1)).withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");
    private static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions.key("sink.enable-delete").booleanType().defaultValue(true).withDescription("whether to enable the delete function");
    private static final ConfigOption<Boolean> SOURCE_USE_OLD_API = ConfigOptions.key("source.use-old-api").booleanType().defaultValue(false).withDescription("Whether to read data using the new interface defined according to the FLIP-27 specification,default false");

    public String factoryIdentifier() {
        return "doris";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(FENODES);
        hashSet.add(TABLE_IDENTIFIER);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(FENODES);
        hashSet.add(TABLE_IDENTIFIER);
        hashSet.add(USERNAME);
        hashSet.add(PASSWORD);
        hashSet.add(DORIS_READ_FIELD);
        hashSet.add(DORIS_FILTER_QUERY);
        hashSet.add(DORIS_TABLET_SIZE);
        hashSet.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
        hashSet.add(DORIS_REQUEST_READ_TIMEOUT_MS);
        hashSet.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
        hashSet.add(DORIS_REQUEST_RETRIES);
        hashSet.add(DORIS_DESERIALIZE_ARROW_ASYNC);
        hashSet.add(DORIS_DESERIALIZE_QUEUE_SIZE);
        hashSet.add(DORIS_BATCH_SIZE);
        hashSet.add(DORIS_EXEC_MEM_LIMIT);
        hashSet.add(SINK_CHECK_INTERVAL);
        hashSet.add(SINK_ENABLE_2PC);
        hashSet.add(SINK_MAX_RETRIES);
        hashSet.add(SINK_BUFFER_FLUSH_INTERVAL);
        hashSet.add(SINK_ENABLE_DELETE);
        hashSet.add(SINK_LABEL_PREFIX);
        hashSet.add(SINK_BUFFER_SIZE);
        hashSet.add(SINK_BUFFER_COUNT);
        hashSet.add(SOURCE_USE_OLD_API);
        return hashSet;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        createTableFactoryHelper.validateExcept(new String[]{STREAM_LOAD_PROP_PREFIX});
        createTableFactoryHelper.getOptions();
        context.getCatalogTable().getSchema().toPhysicalRowDataType();
        return new DorisDynamicTableSource(getDorisOptions(createTableFactoryHelper.getOptions()), getDorisReadOptions(createTableFactoryHelper.getOptions()), TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()));
    }

    private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
        DorisOptions.Builder tableIdentifier = DorisOptions.builder().setFenodes((String) readableConfig.get(FENODES)).setTableIdentifier((String) readableConfig.get(TABLE_IDENTIFIER));
        Optional optional = readableConfig.getOptional(USERNAME);
        tableIdentifier.getClass();
        optional.ifPresent(tableIdentifier::setUsername);
        Optional optional2 = readableConfig.getOptional(PASSWORD);
        tableIdentifier.getClass();
        optional2.ifPresent(tableIdentifier::setPassword);
        return tableIdentifier.build();
    }

    private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
        DorisReadOptions.Builder builder = DorisReadOptions.builder();
        builder.setDeserializeArrowAsync((Boolean) readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)).setDeserializeQueueSize((Integer) readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)).setExecMemLimit((Long) readableConfig.get(DORIS_EXEC_MEM_LIMIT)).setFilterQuery((String) readableConfig.get(DORIS_FILTER_QUERY)).setReadFields((String) readableConfig.get(DORIS_READ_FIELD)).setRequestQueryTimeoutS((Integer) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)).setRequestBatchSize((Integer) readableConfig.get(DORIS_BATCH_SIZE)).setRequestConnectTimeoutMs((Integer) readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)).setRequestReadTimeoutMs((Integer) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)).setRequestRetries((Integer) readableConfig.get(DORIS_REQUEST_RETRIES)).setRequestTabletSize((Integer) readableConfig.get(DORIS_TABLET_SIZE)).setUseOldApi(((Boolean) readableConfig.get(SOURCE_USE_OLD_API)).booleanValue());
        return builder.build();
    }

    private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig, Properties properties) {
        DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
        builder.setCheckInterval((Integer) readableConfig.get(SINK_CHECK_INTERVAL));
        builder.setMaxRetries((Integer) readableConfig.get(SINK_MAX_RETRIES));
        builder.setBufferSize(((Integer) readableConfig.get(SINK_BUFFER_SIZE)).intValue());
        builder.setBufferCount(((Integer) readableConfig.get(SINK_BUFFER_COUNT)).intValue());
        builder.setLabelPrefix((String) readableConfig.get(SINK_LABEL_PREFIX));
        builder.setStreamLoadProp(properties);
        builder.setDeletable((Boolean) readableConfig.get(SINK_ENABLE_DELETE));
        if (!((Boolean) readableConfig.get(SINK_ENABLE_2PC)).booleanValue()) {
            builder.disable2PC();
        }
        return builder.build();
    }

    private Properties getStreamLoadProp(Map<String, String> map) {
        Properties properties = new Properties();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
                properties.put(entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length()), entry.getValue());
            }
        }
        return properties;
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        createTableFactoryHelper.validateExcept(new String[]{STREAM_LOAD_PROP_PREFIX});
        return new DorisDynamicTableSink(getDorisOptions(createTableFactoryHelper.getOptions()), getDorisReadOptions(createTableFactoryHelper.getOptions()), getDorisExecutionOptions(createTableFactoryHelper.getOptions(), getStreamLoadProp(context.getCatalogTable().getOptions())), TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()));
    }
}
