package com.alibaba.ververica.connectors.kafka.catalog;

import com.alibaba.ververica.connectors.kafka.catalog.aliyun.AliyunKafkaClient;
import com.alibaba.ververica.connectors.kafka.catalog.aliyun.AliyunKafkaClientParams;
import com.alibaba.ververica.connectors.kafka.catalog.factory.KafkaCatalogOptions;
import com.alibaba.ververica.connectors.kafka.catalog.schema.JsonSchemaParser;
import com.alibaba.ververica.connectors.kafka.catalog.schema.KafkaSchema;
import com.alibaba.ververica.connectors.kafka.utils.KafkaSchemaUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.Config;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Node;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.config.ConfigResource;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.config.TopicConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.WakeupException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableProvider;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/kafka/catalog/KafkaJsonCatalog.class */
public class KafkaJsonCatalog extends KafkaCatalogBase implements CatalogTableProvider {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonCatalog.class);
    private static final long POLL_TIMEOUT = 10000;
    private static final int RETRY_LIMIT = 10;
    private static final String NUM_PARTITIONS_PROP = "num.partitions";
    private static final String REPLICATION_FACTOR_PROP = "default.replication.factor";
    private final int maxFetchRecords;
    private final boolean compactedTopicAsUpsertTable;
    private final boolean flattenNestedColumns;
    private final boolean primitiveAsString;
    private final JsonSchemaParser jsonSchemaResolver;
    private final String parseKeyErrorFieldName;

    @Nullable
    private final AliyunKafkaClientParams aliyunKafkaClientParams;
    protected int numPartitions;
    protected short replicationFactor;
    protected KafkaConsumer<byte[], byte[]> consumer;
    protected AliyunKafkaClient aliyunKafkaClient;

    public KafkaJsonCatalog(String str, String str2, String str3, String str4, String str5, String str6, String str7, int i, boolean z, Properties properties, boolean z2, boolean z3, TimestampFormat timestampFormat, String str8, @Nullable AliyunKafkaClientParams aliyunKafkaClientParams) {
        super(str, str2, str3, str4, str5, str6, str7, properties);
        this.numPartitions = 1;
        this.replicationFactor = (short) 1;
        this.maxFetchRecords = i;
        this.compactedTopicAsUpsertTable = z;
        this.flattenNestedColumns = z2;
        this.primitiveAsString = z3;
        this.parseKeyErrorFieldName = str8;
        this.jsonSchemaResolver = new JsonSchemaParser(str6, str7, z2, z3, timestampFormat, str8);
        this.aliyunKafkaClientParams = aliyunKafkaClientParams;
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public void open() throws CatalogException {
        super.open();
        this.consumer = getKafkaConsumer();
        if (!isAliyunKafka()) {
            inferDefaultTopicProps();
        } else {
            this.aliyunKafkaClient = new AliyunKafkaClient(this.aliyunKafkaClientParams);
            this.aliyunKafkaClient.open();
        }
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public void close() throws CatalogException {
        super.close();
        if (this.consumer != null) {
            closeResource(this.consumer);
        }
        if (this.aliyunKafkaClient != null) {
            boolean z = false;
            for (int i = 0; i < 3 && !z; i++) {
                try {
                    z = this.aliyunKafkaClient.deleteConsumerGroup(this.groupId, ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                } catch (Exception e) {
                    new CatalogException("Fail to close aliyun kafka client.", e);
                    return;
                }
            }
            if (!z) {
                LOG.warn("Fail to delete the consumer group {}, skip deleting.", this.groupId);
            }
            this.aliyunKafkaClient.close();
        }
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public void createDatabase(String str, CatalogDatabase catalogDatabase, boolean z) throws DatabaseAlreadyExistException, CatalogException {
        if (!z || !getDefaultDatabase().equals(str)) {
            throw new UnsupportedOperationException();
        }
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean z) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        CatalogException catalogException;
        String databaseName = objectPath.getDatabaseName();
        if (!databaseExists(databaseName)) {
            throw new DatabaseNotExistException(getName(), databaseName);
        }
        if (tableExists(objectPath)) {
            if (!z) {
                throw new TableAlreadyExistException(getName(), objectPath);
            }
            return;
        }
        String objectName = objectPath.getObjectName();
        boolean z2 = catalogBaseTable.getUnresolvedSchema().getPrimaryKey().isPresent() && this.compactedTopicAsUpsertTable;
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 5) {
                throw new CatalogException(String.format("Fail to create topic [%s] in kafka json catalog [%s].", objectName, getName()));
            }
            try {
                if (isAliyunKafka()) {
                    this.aliyunKafkaClient.createTopic(objectName, z2, ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                    return;
                }
                NewTopic newTopic = new NewTopic(objectName, this.numPartitions, this.replicationFactor);
                if (z2) {
                    HashMap hashMap = new HashMap();
                    hashMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
                    newTopic = newTopic.configs(hashMap);
                }
                this.adminClient.createTopics(Collections.singleton(newTopic)).values().get(objectName).get(ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                return;
            } finally {
                if (i < r0) {
                }
            }
        }
    }

    public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        if (!tableExists(objectPath)) {
            throw new TableNotExistException(getName(), objectPath);
        }
        String objectName = objectPath.getObjectName();
        KafkaSchema kafkaSchemaByParsingRecord = getKafkaSchemaByParsingRecord(objectName);
        Map<String, String> createTableProperties = createTableProperties(objectName, kafkaSchemaByParsingRecord.getKeyFieldNames(), kafkaSchemaByParsingRecord.isUpsertKafka() ? KafkaConnectorType.UPSERT_KAFKA : KafkaConnectorType.KAFKA, kafkaSchemaByParsingRecord.getTreatKeyAsRaw());
        addJsonFormatOptions(createTableProperties);
        return CatalogTable.of(kafkaSchemaByParsingRecord.getSchema(), (String) null, Collections.emptyList(), createTableProperties);
    }

    private boolean isCompactCleanupPolicyEnabled(String str) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        try {
            return this.adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource).get(TopicConfig.CLEANUP_POLICY_CONFIG).value().contains(TopicConfig.CLEANUP_POLICY_COMPACT);
        } catch (Throwable th) {
            throw new CatalogException(String.format("Fail to get table %s in kafka json catalog [%s]", str, getName()), th);
        }
    }

    private void addJsonFormatOptions(Map<String, String> map) {
        map.put(String.format("value.json.%s", JsonFormatOptions.INFER_SCHEMA_PRIMITIVE_AS_STRING.key()), String.valueOf(this.primitiveAsString));
        map.put(String.format("value.json.%s", JsonFormatOptions.INFER_SCHEMA_FLATTEN_NECOLUMNS_ENABLE.key()), String.valueOf(this.flattenNestedColumns));
    }

    private KafkaSchema getKafkaSchemaByParsingRecord(String str) {
        long expectedFetchSize;
        KafkaSchema kafkaSchema;
        int i;
        int i2;
        try {
            List<TopicPartition> topicPartitions = getTopicPartitions(str);
            Map<TopicPartition, Long> beginningOffsets = this.consumer.beginningOffsets(topicPartitions);
            Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(topicPartitions);
            expectedFetchSize = getExpectedFetchSize(beginningOffsets, endOffsets);
            Map<TopicPartition, Long> startOffsets = getStartOffsets(beginningOffsets, endOffsets, expectedFetchSize);
            this.consumer.assign(new ArrayList(startOffsets.keySet()));
            KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumer;
            kafkaConsumer.getClass();
            startOffsets.forEach((v1, v2) -> {
                r1.seek(v1, v2);
            });
            kafkaSchema = new KafkaSchema(Schema.newBuilder().build(), new LinkedHashSet(), false, false);
            i = 0;
            i2 = 0;
        } catch (Exception e) {
            throw new CatalogException(String.format("Failed getting table %s by parsing the json record.", str), e);
        }
        while (true) {
            int i3 = i;
            i++;
            if (i3 >= 10 || i2 >= expectedFetchSize) {
                break;
            }
            try {
                ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
                i2 += poll.count();
                Iterator<ConsumerRecord<byte[], byte[]>> it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<byte[], byte[]> next = it.next();
                    Optional<KafkaSchema> empty = Optional.empty();
                    try {
                        empty = this.jsonSchemaResolver.parseRecordSchema(next);
                    } catch (Exception e2) {
                        LOG.warn(String.format("Fail to parse schema from %s.", next.toString()));
                    }
                    if (empty.isPresent()) {
                        kafkaSchema = mergeKafkaSchema(kafkaSchema, empty.get());
                    }
                }
            } catch (WakeupException e3) {
                LOG.warn(String.format("Fail to get records of topic [%s] in the retry time %d.", str, Integer.valueOf(i)));
            }
            throw new CatalogException(String.format("Failed getting table %s by parsing the json record.", str), e);
        }
        if (i2 < this.maxFetchRecords) {
            LOG.warn(String.format("Except to get %d records of topic [%s], actual get %d records.", Integer.valueOf(this.maxFetchRecords), str, Integer.valueOf(i2)));
        }
        boolean isUpsertKafkaTable = isUpsertKafkaTable(str, kafkaSchema.getKeyFieldNames());
        return new KafkaSchema(KafkaSchemaUtils.generateKafkaSchemaForCatalog(kafkaSchema.getSchema(), kafkaSchema.getKeyFieldNames(), getPrimaryKeys(isUpsertKafkaTable, kafkaSchema.getKeyFieldNames())), kafkaSchema.getKeyFieldNames(), kafkaSchema.getTreatKeyAsRaw(), isUpsertKafkaTable);
    }

    private boolean isUpsertKafkaTable(String str, LinkedHashSet<String> linkedHashSet) {
        return this.compactedTopicAsUpsertTable && !linkedHashSet.isEmpty() && isCompactCleanupPolicyEnabled(str);
    }

    private String[] getPrimaryKeys(boolean z, LinkedHashSet<String> linkedHashSet) {
        return z ? (String[]) linkedHashSet.stream().toArray(i -> {
            return new String[i];
        }) : PK;
    }

    private KafkaSchema mergeKafkaSchema(KafkaSchema kafkaSchema, KafkaSchema kafkaSchema2) {
        String fullKeyFieldNameWhenParseError = this.jsonSchemaResolver.getFullKeyFieldNameWhenParseError();
        KafkaSchemaUtils.checkFieldConflict(kafkaSchema2.getSchema());
        if (kafkaSchema.getTreatKeyAsRaw()) {
            kafkaSchema2 = new KafkaSchema(modifyKeyFieldsWhenTreatKeyAsRaw(kafkaSchema2.getSchema(), kafkaSchema2.getKeyFieldNames(), fullKeyFieldNameWhenParseError), kafkaSchema2.getKeyFieldNames(), kafkaSchema2.getTreatKeyAsRaw(), kafkaSchema2.isUpsertKafka());
        } else if (kafkaSchema2.getTreatKeyAsRaw()) {
            kafkaSchema = new KafkaSchema(modifyKeyFieldsWhenTreatKeyAsRaw(kafkaSchema.getSchema(), kafkaSchema.getKeyFieldNames(), fullKeyFieldNameWhenParseError), new LinkedHashSet(Collections.singletonList(fullKeyFieldNameWhenParseError)), true, kafkaSchema.isUpsertKafka());
        } else {
            kafkaSchema.getKeyFieldNames().addAll(kafkaSchema2.getKeyFieldNames());
        }
        return new KafkaSchema(KafkaSchemaUtils.mergeSchema(kafkaSchema.getSchema(), kafkaSchema2.getSchema()), kafkaSchema.getKeyFieldNames(), kafkaSchema.getTreatKeyAsRaw(), kafkaSchema.isUpsertKafka());
    }

    private Schema modifyKeyFieldsWhenTreatKeyAsRaw(Schema schema, LinkedHashSet<String> linkedHashSet, String str) {
        Schema.Builder newBuilder = Schema.newBuilder();
        newBuilder.fromColumns((List) schema.getColumns().stream().filter(unresolvedColumn -> {
            return !linkedHashSet.contains(unresolvedColumn.getName());
        }).collect(Collectors.toList()));
        newBuilder.column(str, DataTypes.BYTES());
        return newBuilder.build();
    }

    @VisibleForTesting
    Map<TopicPartition, Long> getStartOffsets(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2, long j) {
        HashMap hashMap = new HashMap(map2);
        long j2 = j;
        int size = map.size();
        ArrayList<TopicPartition> arrayList = new ArrayList(map.keySet());
        arrayList.sort((topicPartition, topicPartition2) -> {
            return (int) ((((Long) map2.get(topicPartition)).longValue() - ((Long) map.get(topicPartition)).longValue()) - (((Long) map2.get(topicPartition2)).longValue() - ((Long) map.get(topicPartition2)).longValue()));
        });
        for (TopicPartition topicPartition3 : arrayList) {
            long min = Math.min(Long.valueOf(map2.get(topicPartition3).longValue() - map.get(topicPartition3).longValue()).longValue(), ((j2 + size) - 1) / size);
            hashMap.put(topicPartition3, Long.valueOf(map2.get(topicPartition3).longValue() - min));
            j2 -= min;
            size--;
        }
        return hashMap;
    }

    @VisibleForTesting
    long getExpectedFetchSize(Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2) {
        long j = this.maxFetchRecords;
        for (TopicPartition topicPartition : map2.keySet()) {
            j -= map2.get(topicPartition).longValue() - map.get(topicPartition).longValue();
            if (j <= 0) {
                return this.maxFetchRecords;
            }
        }
        return this.maxFetchRecords - j;
    }

    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
        Properties properties = new Properties();
        deepCopyProperties(this.catalogProperties, properties);
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        properties.setProperty("group.id", this.groupId);
        properties.setProperty("client.id", this.groupId + "-catalog-consumer");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new KafkaConsumer<>(properties);
    }

    private void inferDefaultTopicProps() {
        try {
            Collection<Node> collection = this.adminClient.describeCluster().nodes().get(ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
            if (collection == null || collection.isEmpty()) {
                throw new IllegalArgumentException("The number of available brokers is 0.");
            }
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, collection.stream().findAny().get().idString());
            Config config = this.adminClient.describeConfigs(Arrays.asList(configResource)).all().get(ADMIN_CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS).get(configResource);
            if (config.get(NUM_PARTITIONS_PROP) != null) {
                this.numPartitions = Integer.parseInt(config.get(NUM_PARTITIONS_PROP).value());
            }
            if (config.get(REPLICATION_FACTOR_PROP) != null) {
                this.replicationFactor = Short.parseShort(config.get(REPLICATION_FACTOR_PROP).value());
            }
            LOG.info(String.format("The inferred partition number is %d, replication factor is %d for kafka cluster %s.", Integer.valueOf(this.numPartitions), Short.valueOf(this.replicationFactor), getDefaultDatabase()));
        } catch (Throwable th) {
            throw new CatalogException("Fail to get the broker config.", th);
        }
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof KafkaJsonCatalog)) {
            return false;
        }
        KafkaJsonCatalog kafkaJsonCatalog = (KafkaJsonCatalog) obj;
        return super.equals(obj) && this.maxFetchRecords == kafkaJsonCatalog.maxFetchRecords && this.flattenNestedColumns == kafkaJsonCatalog.flattenNestedColumns && this.primitiveAsString == kafkaJsonCatalog.primitiveAsString && Objects.equals(this.parseKeyErrorFieldName, kafkaJsonCatalog.parseKeyErrorFieldName) && this.numPartitions == kafkaJsonCatalog.numPartitions && this.replicationFactor == kafkaJsonCatalog.replicationFactor && Objects.equals(this.aliyunKafkaClientParams, kafkaJsonCatalog.aliyunKafkaClientParams);
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public int hashCode() {
        return Objects.hash(Integer.valueOf(super.hashCode()), Integer.valueOf(this.maxFetchRecords), Boolean.valueOf(this.flattenNestedColumns), Boolean.valueOf(this.primitiveAsString), this.parseKeyErrorFieldName, Integer.valueOf(this.numPartitions), Short.valueOf(this.replicationFactor), this.aliyunKafkaClientParams);
    }

    public CatalogTable inferTable(ObjectPath objectPath, CatalogDatabase catalogDatabase, CatalogTable catalogTable) throws CatalogException {
        String str = (String) catalogTable.getOptions().get(FactoryUtil.CONNECTOR.key());
        if (str != null && !UpsertKafkaDynamicTableFactory.IDENTIFIER.equals(str)) {
            throw new UnsupportedOperationException(String.format("Kafka json catalog only supports 'upsert-kafka' table as sink in CTAS, current connector identifier is '%s'.", str));
        }
        Optional primaryKey = catalogTable.getUnresolvedSchema().getPrimaryKey();
        if (!primaryKey.isPresent()) {
            throw new UnsupportedOperationException(String.format("The upsert kafka table [%s] must have primary keys, but actual is empty.", objectPath.getFullName()));
        }
        Map<String, String> createTableProperties = createTableProperties(objectPath.getObjectName(), new LinkedHashSet((Collection) ((Schema.UnresolvedPrimaryKey) primaryKey.get()).getColumnNames().stream().map(str2 -> {
            return this.keyPrefix + str2;
        }).collect(Collectors.toList())), KafkaConnectorType.UPSERT_KAFKA);
        addJsonFormatOptions(createTableProperties);
        HashMap hashMap = new HashMap(createTableProperties);
        hashMap.putAll((Map) catalogDatabase.getProperties().entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).startsWith("cdas.");
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        hashMap.putAll(catalogTable.getOptions());
        return CatalogTable.of(inferTableSchema(catalogTable.getUnresolvedSchema()), catalogTable.getComment(), Collections.emptyList(), hashMap);
    }

    public ObjectIdentifier inferTableIdentifier(ObjectIdentifier objectIdentifier, CatalogDatabase catalogDatabase, CatalogTable catalogTable) {
        Map properties = catalogDatabase.getProperties();
        if (CollectionUtil.isNullOrEmpty(properties)) {
            return objectIdentifier;
        }
        String str = (String) properties.get(KafkaCatalogOptions.CDAS_TOPIC_PATTERN.key());
        if (!StringUtils.isNullOrWhitespaceOnly(str)) {
            objectIdentifier = ObjectIdentifier.of(objectIdentifier.getCatalogName(), objectIdentifier.getDatabaseName(), str.replace(KafkaCatalogOptions.TABLE_PLACEHOLDER, objectIdentifier.getObjectName()));
        }
        return objectIdentifier;
    }

    private Schema inferTableSchema(Schema schema) {
        LinkedHashSet linkedHashSet = new LinkedHashSet((Collection) schema.getPrimaryKey().map((v0) -> {
            return v0.getColumnNames();
        }).orElse(Collections.emptyList()));
        Schema.Builder newBuilder = Schema.newBuilder();
        schema.getColumns().forEach(unresolvedColumn -> {
            if (!(unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn)) {
                newBuilder.fromColumns(Collections.singletonList(unresolvedColumn));
            } else {
                Schema.UnresolvedPhysicalColumn unresolvedPhysicalColumn = (Schema.UnresolvedPhysicalColumn) unresolvedColumn;
                newBuilder.column((linkedHashSet.contains(unresolvedColumn.getName()) ? this.keyPrefix : this.valuePrefix) + unresolvedPhysicalColumn.getName(), unresolvedPhysicalColumn.getDataType());
            }
        });
        newBuilder.primaryKey((List) linkedHashSet.stream().map(str -> {
            return this.keyPrefix + str;
        }).collect(Collectors.toList()));
        return newBuilder.build();
    }

    private boolean isAliyunKafka() {
        return this.aliyunKafkaClientParams != null;
    }

    @VisibleForTesting
    public int getNumPartitions() {
        return this.numPartitions;
    }

    @VisibleForTesting
    public short getReplicationFactor() {
        return this.replicationFactor;
    }

    @VisibleForTesting
    public AliyunKafkaClient getAliyunKafkaClient() {
        return this.aliyunKafkaClient;
    }
}
