package com.alibaba.ververica.connectors.kafka.catalog;

import com.alibaba.ververica.connectors.kafka.catalog.schema.KafkaSchema;
import com.alibaba.ververica.connectors.kafka.catalog.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import com.alibaba.ververica.connectors.kafka.catalog.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import com.alibaba.ververica.connectors.kafka.catalog.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/kafka/catalog/KafkaSchemaRegistryCatalog.class */
public class KafkaSchemaRegistryCatalog extends KafkaCatalogBase {
    private static final String CONFLUENT_VALUE_SCHEMA_NAME_SUFFIX = "-value";
    private static final String CONFLUENT_KEY_SCHEMA_NAME_SUFFIX = "-key";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSchemaRegistryCatalog.class);
    private final String schemaRegistryUrl;
    private final SchemaRegistryClient schemaRegistryClient;

    @VisibleForTesting
    Map<String, String> kafkaProps;

    public KafkaSchemaRegistryCatalog(String str, @Nullable String str2, String str3, String str4, int i, String str5, String str6, String str7, String str8, Properties properties) {
        this(str, str2, str3, str4, new CachedSchemaRegistryClient(str4, i), str5, str6, str7, str8, properties);
    }

    @VisibleForTesting
    KafkaSchemaRegistryCatalog(String str, @Nullable String str2, String str3, String str4, SchemaRegistryClient schemaRegistryClient, String str5, String str6, String str7, String str8, Properties properties) {
        super(str, str2 == null ? "kafka" : str2, str3, str5, str6, str7, str8, properties);
        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(str4), "Schema Registry URL can not be null or empty.");
        this.schemaRegistryUrl = str4;
        this.schemaRegistryClient = schemaRegistryClient;
        this.kafkaProps = new HashMap();
        LOG.info("Created SchemaRegistryCatalog '{}'", str);
    }

    public Optional<Factory> getFactory() {
        return Optional.of(new KafkaDynamicTableFactory());
    }

    public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        String objectName = objectPath.getObjectName();
        if (!tableExists(objectPath)) {
            throw new TableNotExistException(getName(), objectPath);
        }
        KafkaSchema kafkaSchemaBySchemaRegistry = getKafkaSchemaBySchemaRegistry(objectName);
        Map<String, String> createTableProperties = createTableProperties(objectName, kafkaSchemaBySchemaRegistry.getKeyFieldNames(), KafkaConnectorType.KAFKA);
        createTableProperties.put(String.format("value.%s.%s", "avro-confluent", AvroConfluentFormatOptions.URL.key()), this.schemaRegistryUrl);
        if (!kafkaSchemaBySchemaRegistry.getKeyFieldNames().isEmpty()) {
            createTableProperties.put(String.format("key.%s.%s", "avro-confluent", AvroConfluentFormatOptions.URL.key()), this.schemaRegistryUrl);
        }
        return CatalogTable.of(kafkaSchemaBySchemaRegistry.getSchema(), (String) null, Collections.emptyList(), createTableProperties);
    }

    private Schema getSchemaForAvroSchema(String str, String str2) throws RestClientException, IOException {
        return fromRowDataType(AvroSchemaConverter.convertToDataType(this.schemaRegistryClient.getLatestSchemaMetadata(str).getSchema()), str2);
    }

    private KafkaSchema getKafkaSchemaBySchemaRegistry(String str) {
        try {
            String str2 = str + CONFLUENT_VALUE_SCHEMA_NAME_SUFFIX;
            String str3 = str + CONFLUENT_KEY_SCHEMA_NAME_SUFFIX;
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            Schema.Builder newBuilder = Schema.newBuilder();
            newBuilder.fromColumns(getSchemaForAvroSchema(str2, this.valuePrefix).getColumns());
            if (this.schemaRegistryClient.getAllSubjects().contains(str3)) {
                Schema schemaForAvroSchema = getSchemaForAvroSchema(str3, this.keyPrefix);
                linkedHashSet.addAll((Collection) schemaForAvroSchema.getColumns().stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList()));
                newBuilder.fromColumns(schemaForAvroSchema.getColumns());
            }
            return new KafkaSchema(appendKafkaMetadataAndPK(newBuilder.build(), PK), linkedHashSet, false, false);
        } catch (RestClientException | IOException e) {
            throw new CatalogException(String.format("Failed getting table %s from schema registry %s", str, this.schemaRegistryUrl), e);
        }
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof KafkaSchemaRegistryCatalog)) {
            return false;
        }
        KafkaSchemaRegistryCatalog kafkaSchemaRegistryCatalog = (KafkaSchemaRegistryCatalog) obj;
        return super.equals(obj) && this.schemaRegistryUrl.equals(kafkaSchemaRegistryCatalog.schemaRegistryUrl) && this.kafkaProps.equals(kafkaSchemaRegistryCatalog.kafkaProps);
    }

    @Override // com.alibaba.ververica.connectors.kafka.catalog.KafkaCatalogBase
    public int hashCode() {
        return Objects.hash(this.catalogProperties, this.bootstrapServers, this.groupId, this.keyPrefix, this.valuePrefix, this.format, this.schemaRegistryUrl, this.kafkaProps);
    }
}
