package com.alibaba.ververica.connectors.kafka.catalog.factory;

import com.alibaba.ververica.connectors.kafka.catalog.KafkaJsonCatalog;
import com.alibaba.ververica.connectors.kafka.catalog.KafkaSchemaRegistryCatalog;
import com.alibaba.ververica.connectors.kafka.catalog.aliyun.AliyunKafkaClientParams;
import com.aliyun.core.http.BodyType;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonFormatOptionsUtil;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:com/alibaba/ververica/connectors/kafka/catalog/factory/KafkaCatalogFactory.class */
public class KafkaCatalogFactory implements CatalogFactory {
    public String factoryIdentifier() {
        return "kafka";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS);
        hashSet.add(FactoryUtil.FORMAT);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(AvroConfluentFormatOptions.URL);
        hashSet.add(KafkaCatalogOptions.SCHEMA_CAPACITY);
        hashSet.add(KafkaCatalogOptions.DEFAULT_DATABASE);
        hashSet.add(KafkaCatalogOptions.MAX_FETCH_RECORDS);
        hashSet.add(KafkaCatalogOptions.COMPACTED_TOPIC_AS_UPSERT_TABLE);
        hashSet.add(KafkaConnectorOptions.KEY_FIELDS_PREFIX);
        hashSet.add(KafkaConnectorOptions.VALUE_FIELDS_PREFIX);
        hashSet.add(KafkaCatalogOptions.PARSE_KEY_ERROR_FIELD_NAME);
        hashSet.add(JsonFormatOptions.TIMESTAMP_FORMAT);
        hashSet.add(JsonFormatOptions.INFER_SCHEMA_FLATTEN_NECOLUMNS_ENABLE);
        hashSet.add(JsonFormatOptions.INFER_SCHEMA_PRIMITIVE_AS_STRING);
        hashSet.add(KafkaCatalogOptions.ALIYUN_KAFKA_AK);
        hashSet.add(KafkaCatalogOptions.ALIYUN_KAFKA_SK);
        hashSet.add(KafkaCatalogOptions.ALIYUN_KAFKA_INSTANCE_ID);
        hashSet.add(KafkaCatalogOptions.ALIYUN_KAFKA_ENDPOINT);
        hashSet.add(KafkaCatalogOptions.ALIYUN_KAFKA_REGION_ID);
        return hashSet;
    }

    public Catalog createCatalog(CatalogFactory.Context context) {
        FactoryUtil.CatalogFactoryHelper createCatalogFactoryHelper = FactoryUtil.createCatalogFactoryHelper(this, context);
        createCatalogFactoryHelper.validateExcept(new String[]{"properties."});
        ReadableConfig options = createCatalogFactoryHelper.getOptions();
        checkOptions(options);
        String str = (String) options.get(FactoryUtil.FORMAT);
        TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(options);
        String str2 = (String) options.getOptional(AvroConfluentFormatOptions.URL).orElse(null);
        return str2 == null ? new KafkaJsonCatalog(context.getName(), (String) options.get(KafkaCatalogOptions.DEFAULT_DATABASE), (String) options.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS), KafkaCatalogOptions.DEFAULT_GROUP_ID + ThreadLocalRandom.current().nextLong(), str, (String) options.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse(KafkaCatalogOptions.DEFAULT_KEY_PREFIX), (String) options.getOptional(KafkaConnectorOptions.VALUE_FIELDS_PREFIX).orElse(KafkaCatalogOptions.DEFAULT_VALUE_PREFIX), ((Integer) options.get(KafkaCatalogOptions.MAX_FETCH_RECORDS)).intValue(), ((Boolean) options.get(KafkaCatalogOptions.COMPACTED_TOPIC_AS_UPSERT_TABLE)).booleanValue(), KafkaCatalogOptions.getOtherProperties(context.getOptions(), "properties."), ((Boolean) options.get(JsonFormatOptions.INFER_SCHEMA_FLATTEN_NECOLUMNS_ENABLE)).booleanValue(), ((Boolean) options.get(JsonFormatOptions.INFER_SCHEMA_PRIMITIVE_AS_STRING)).booleanValue(), timestampFormat, (String) options.get(KafkaCatalogOptions.PARSE_KEY_ERROR_FIELD_NAME), AliyunKafkaClientParams.createAliyunKafkaClientParams(options)) : new KafkaSchemaRegistryCatalog(context.getName(), (String) options.get(KafkaCatalogOptions.DEFAULT_DATABASE), (String) options.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS), str2, ((Integer) options.get(KafkaCatalogOptions.SCHEMA_CAPACITY)).intValue(), KafkaCatalogOptions.DEFAULT_GROUP_ID + ThreadLocalRandom.current().nextLong(), str, (String) options.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse(KafkaCatalogOptions.DEFAULT_KEY_PREFIX), (String) options.getOptional(KafkaConnectorOptions.VALUE_FIELDS_PREFIX).orElse(KafkaCatalogOptions.DEFAULT_VALUE_PREFIX), KafkaCatalogOptions.getOtherProperties(context.getOptions(), "properties."));
    }

    private void checkOptions(ReadableConfig readableConfig) {
        String str = (String) readableConfig.getOptional(AvroConfluentFormatOptions.URL).orElse(null);
        String str2 = (String) readableConfig.get(FactoryUtil.FORMAT);
        if (str == null) {
            Preconditions.checkState(BodyType.JSON.equals(str2), String.format("Now only support parsing %s format to get schema, actual is %s.", BodyType.JSON, str2));
        } else {
            Preconditions.checkState("avro-confluent".equals(str2), String.format("Catalog using schema registry now only support %s format, actual is %s.", "avro-confluent", str2));
        }
    }
}
