package org.apache.flink.streaming.connectors.kafka.table;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.ColumnSpec;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.flink.table.data.SourceRecord;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaDeserializationSchema.class */
public class DynamicEvolvingKafkaDeserializationSchema implements KafkaDeserializationSchema<SourceRecord> {
    private static final long serialVersionUID = 1;

    @Nullable
    private final DeserializationSchema<RowData> keyDeserializer;
    private final DeserializationSchema<SourceRecord> valueDeserializer;
    private final TypeInformation<SourceRecord> outputTypeInfo;
    private final RowType keyType;

    @Nullable
    private final String valuePrefix;
    private final boolean hasMetadata;
    private final ConcatCollector collector;
    private transient RowData emptyKey;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaDeserializationSchema$ConcatCollector.class */
    private class ConcatCollector implements Collector<SourceRecord>, Serializable {
        private static final long serialVersionUID = 1;
        private final ObjectPath tablePath;
        private final RowType keyType;
        private final RowType metadataType;
        private final DynamicKafkaDeserializationSchema.MetadataConverter[] converters;
        private final SchemaSpec schemaWithoutValue;
        private transient ConsumerRecord<byte[], byte[]> originMessage;
        private transient RowData key;
        private transient Collector<SourceRecord> outputCollector;

        public ConcatCollector(ObjectPath objectPath, RowType rowType, RowType rowType2, DynamicKafkaDeserializationSchema.MetadataConverter[] metadataConverterArr) {
            this.tablePath = objectPath;
            this.keyType = rowType;
            this.metadataType = rowType2;
            this.converters = metadataConverterArr;
            this.schemaWithoutValue = SchemaSpec.newBuilder().fromRowType(rowType).fromRowType(rowType2).build();
        }

        public void collect(SourceRecord sourceRecord) {
            if (this.key != null || (!(sourceRecord == null || sourceRecord.getSchema().getColumnCount() == 0) || DynamicEvolvingKafkaDeserializationSchema.this.hasMetadata)) {
                this.outputCollector.collect(concatAll(this.key == null ? DynamicEvolvingKafkaDeserializationSchema.this.emptyKey : this.key, sourceRecord));
            }
        }

        public void close() {
        }

        private SourceRecord concatAll(RowData rowData, @Nullable SourceRecord sourceRecord) {
            RowData extractMetadata = extractMetadata();
            if (sourceRecord == null) {
                return new SourceRecord(this.tablePath, this.schemaWithoutValue, new JoinedRowData(rowData, extractMetadata));
            }
            return new SourceRecord(this.tablePath, buildOutputSchema(sourceRecord), new JoinedRowData(new JoinedRowData(rowData, sourceRecord.getRow()), extractMetadata));
        }

        private RowData extractMetadata() {
            GenericRowData genericRowData = new GenericRowData(this.metadataType.getFieldCount());
            for (int i = 0; i < genericRowData.getArity(); i++) {
                genericRowData.setField(i, this.converters[i].read(this.originMessage));
            }
            return genericRowData;
        }

        private SchemaSpec buildOutputSchema(SourceRecord sourceRecord) {
            SchemaSpec.Builder newBuilder = SchemaSpec.newBuilder();
            newBuilder.fromRowType(this.keyType);
            if (DynamicEvolvingKafkaDeserializationSchema.this.valuePrefix == null) {
                newBuilder.fromSchema(sourceRecord.getSchema());
            } else {
                DynamicEvolvingKafkaDeserializationSchema.this.addPrefixForValueRecordColumn(newBuilder, sourceRecord.getSchema(), DynamicEvolvingKafkaDeserializationSchema.this.valuePrefix);
            }
            newBuilder.fromRowType(this.metadataType);
            return newBuilder.build();
        }
    }

    public DynamicEvolvingKafkaDeserializationSchema(@Nullable DeserializationSchema<RowData> deserializationSchema, DeserializationSchema<SourceRecord> deserializationSchema2, DynamicKafkaDeserializationSchema.MetadataConverter[] metadataConverterArr, ObjectPath objectPath, RowType rowType, RowType rowType2, @Nullable String str, TypeInformation<SourceRecord> typeInformation) {
        this.keyDeserializer = deserializationSchema;
        this.valueDeserializer = deserializationSchema2;
        this.outputTypeInfo = typeInformation;
        this.keyType = rowType;
        this.valuePrefix = str;
        this.hasMetadata = metadataConverterArr.length > 0;
        this.collector = new ConcatCollector(objectPath, rowType, rowType2, metadataConverterArr);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    public void open(DeserializationSchema.InitializationContext initializationContext) throws Exception {
        this.emptyKey = new GenericRowData(this.keyType.getFieldCount());
        if (this.keyDeserializer != null) {
            this.keyDeserializer.open(initializationContext);
        }
        this.valueDeserializer.open(initializationContext);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    public boolean isEndOfStream(SourceRecord sourceRecord) {
        return false;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<SourceRecord> collector) throws Exception {
        SourceRecord sourceRecord = (SourceRecord) this.valueDeserializer.deserialize(consumerRecord.value());
        if (this.keyDeserializer != null || this.hasMetadata) {
            if (this.keyDeserializer != null) {
                this.collector.key = (RowData) this.keyDeserializer.deserialize(consumerRecord.key());
            }
            this.collector.outputCollector = collector;
            this.collector.originMessage = consumerRecord;
            this.collector.collect(sourceRecord);
            return;
        }
        if (sourceRecord == null || this.valuePrefix == null) {
            if (sourceRecord != null) {
                collector.collect(sourceRecord);
            }
        } else {
            SchemaSpec.Builder newBuilder = SchemaSpec.newBuilder();
            addPrefixForValueRecordColumn(newBuilder, sourceRecord.getSchema(), this.valuePrefix);
            collector.collect(new SourceRecord(sourceRecord.getTablePath(), newBuilder.build(), sourceRecord.getRow()));
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    public SourceRecord deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        throw new IllegalStateException("A collector is required for deserializing.");
    }

    public TypeInformation<SourceRecord> getProducedType() {
        return this.outputTypeInfo;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addPrefixForValueRecordColumn(SchemaSpec.Builder builder, SchemaSpec schemaSpec, String str) {
        for (ColumnSpec columnSpec : schemaSpec.getColumns()) {
            builder.column(str + columnSpec.getName(), columnSpec.getDataType());
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    public /* bridge */ /* synthetic */ SourceRecord deserialize(ConsumerRecord consumerRecord) throws Exception {
        return deserialize((ConsumerRecord<byte[], byte[]>) consumerRecord);
    }
}
