package org.apache.flink.streaming.connectors.kafka.table;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SchemaAwareSinkRecord;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.flink.table.data.SinkRecord;
import org.apache.flink.table.evolution.SchemaClient;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaRecordSerializationSchema.class */
public class DynamicEvolvingKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<SinkRecord> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(DynamicEvolvingKafkaRecordSerializationSchema.class);
    private final String topic;
    private final FlinkKafkaPartitioner<RowData> partitioner;
    private final boolean hasMetadata;
    private final boolean upsertMode;
    private SchemaClient schemaClient;
    private final List<String> metadataKeys;
    private final SerializationSchema<RowData> keySerializationSchema;
    private final SerializationSchema<SchemaAwareSinkRecord> valueSerializationSchema;
    private final KafkaConnectorOptions.ValueFieldsStrategy valueFieldsStrategy;
    private final List<String> keyFields;

    @Nullable
    private final String valuePrefix;

    @Nullable
    private final String keyPrefix;
    private final EvolvingSerializationSchemaCache evolvingSerializationSchemaCache;
    protected static final int SCHEMA_CACHE_SIZE = 50;
    private static final int METADATA_NOT_FOUND = -1;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaRecordSerializationSchema$EvolvingSerializationSchemaCache.class */
    public static class EvolvingSerializationSchemaCache implements Serializable {
        private static final long serialVersionUID = 3737757423249510964L;
        private final HashMap<Integer, RowData.FieldGetter[]> keyFieldGetterMap = new HashMap<>(DynamicEvolvingKafkaRecordSerializationSchema.SCHEMA_CACHE_SIZE);
        private final HashMap<Integer, RowData.FieldGetter[]> valueFieldGetterMap = new HashMap<>(DynamicEvolvingKafkaRecordSerializationSchema.SCHEMA_CACHE_SIZE);
        private final HashMap<Integer, int[]> metadataPositionMap = new HashMap<>(DynamicEvolvingKafkaRecordSerializationSchema.SCHEMA_CACHE_SIZE);
        private final LRUMap<Integer, DataType> valueDataTypeMap = new LRUMap<>(DynamicEvolvingKafkaRecordSerializationSchema.SCHEMA_CACHE_SIZE, entry -> {
            this.keyFieldGetterMap.remove(entry.getKey());
            this.valueFieldGetterMap.remove(entry.getKey());
            this.metadataPositionMap.remove(entry.getKey());
        });

        EvolvingSerializationSchemaCache() {
        }

        public boolean contains(int i) {
            return this.valueDataTypeMap.containsKey(Integer.valueOf(i));
        }

        public void addCache(int i, DataType dataType, RowData.FieldGetter[] fieldGetterArr, @Nullable RowData.FieldGetter[] fieldGetterArr2, @Nullable int[] iArr) {
            this.valueDataTypeMap.put(Integer.valueOf(i), dataType);
            this.valueFieldGetterMap.put(Integer.valueOf(i), fieldGetterArr);
            if (fieldGetterArr2 != null) {
                this.keyFieldGetterMap.put(Integer.valueOf(i), fieldGetterArr2);
            }
            if (iArr != null) {
                this.metadataPositionMap.put(Integer.valueOf(i), iArr);
            }
        }

        public DataType getValueDataType(int i) {
            return this.valueDataTypeMap.get(Integer.valueOf(i));
        }

        public RowData.FieldGetter[] getValueFieldGetters(int i) {
            return this.valueFieldGetterMap.get(Integer.valueOf(i));
        }

        @Nullable
        public RowData.FieldGetter[] getKeyFieldGetters(int i) {
            return this.keyFieldGetterMap.get(Integer.valueOf(i));
        }

        @Nullable
        public int[] getMetadataPosition(int i) {
            return this.metadataPositionMap.get(Integer.valueOf(i));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 685471575:
                    if (implMethodName.equals("lambda$new$422452e1$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaRecordSerializationSchema$LRUMap$RemovalListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("onRemoval") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Map$Entry;)V") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaRecordSerializationSchema$EvolvingSerializationSchemaCache") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map$Entry;)V")) {
                        EvolvingSerializationSchemaCache evolvingSerializationSchemaCache = (EvolvingSerializationSchemaCache) serializedLambda.getCapturedArg(0);
                        return entry -> {
                            this.keyFieldGetterMap.remove(entry.getKey());
                            this.valueFieldGetterMap.remove(entry.getKey());
                            this.metadataPositionMap.remove(entry.getKey());
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaRecordSerializationSchema$LRUMap.class */
    public static final class LRUMap<K, V> extends LinkedHashMap<K, V> {
        private static final long serialVersionUID = 1;
        protected final int cacheSize;
        private final RemovalListener<K, V> removalListener;

        /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/DynamicEvolvingKafkaRecordSerializationSchema$LRUMap$RemovalListener.class */
        public interface RemovalListener<K, V> extends Serializable {
            void onRemoval(Map.Entry<K, V> entry);
        }

        public LRUMap(int i) {
            this(i, null);
        }

        public LRUMap(int i, RemovalListener<K, V> removalListener) {
            super(((int) Math.ceil(i / 0.75d)) + 1, 0.75f, true);
            this.cacheSize = i;
            this.removalListener = removalListener;
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<K, V> entry) {
            if (size() <= this.cacheSize) {
                return false;
            }
            if (this.removalListener == null) {
                return true;
            }
            this.removalListener.onRemoval(entry);
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DynamicEvolvingKafkaRecordSerializationSchema(String str, KafkaConnectorOptions.ValueFieldsStrategy valueFieldsStrategy, @Nullable String str2, @Nullable String str3, List<String> list, @Nullable FlinkKafkaPartitioner<RowData> flinkKafkaPartitioner, @Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<SchemaAwareSinkRecord> serializationSchema2, boolean z, List<String> list2, boolean z2) {
        if (z2) {
            Preconditions.checkArgument((serializationSchema == null || list.isEmpty()) ? false : true, "Key must be set in upsert mode for serialization schema.");
        }
        this.topic = (String) Preconditions.checkNotNull(str);
        this.partitioner = flinkKafkaPartitioner;
        this.hasMetadata = z;
        this.upsertMode = z2;
        this.metadataKeys = list2;
        this.keySerializationSchema = serializationSchema;
        this.valueSerializationSchema = serializationSchema2;
        this.evolvingSerializationSchemaCache = new EvolvingSerializationSchemaCache();
        this.valueFieldsStrategy = valueFieldsStrategy;
        this.valuePrefix = str2;
        this.keyPrefix = str3;
        this.keyFields = list;
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
    public ProducerRecord<byte[], byte[]> serialize(SinkRecord sinkRecord, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext, Long l) {
        byte[] serialize;
        byte[] serialize2;
        int schemaId = sinkRecord.getSchemaId();
        initSerializationCacheEntry(sinkRecord);
        RowData row = sinkRecord.getRow();
        if (this.keySerializationSchema == null && !this.hasMetadata) {
            byte[] serialize3 = this.valueSerializationSchema.serialize(generateSchemaAwareSinkRecord(sinkRecord.getTablePath(), (RowType) this.evolvingSerializationSchemaCache.getValueDataType(schemaId).getLogicalType(), schemaId, row));
            return new ProducerRecord<>(this.topic, extractPartition(row, null, serialize3, kafkaSinkContext.getPartitionsForTopic(this.topic)), null, serialize3);
        }
        if (this.keySerializationSchema == null) {
            serialize = null;
        } else {
            serialize = this.keySerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.createProjectedRow(row, RowKind.INSERT, this.evolvingSerializationSchemaCache.getKeyFieldGetters(schemaId)));
        }
        RowKind rowKind = row.getRowKind();
        if (!this.upsertMode) {
            serialize2 = this.valueSerializationSchema.serialize(generateSchemaAwareSinkRecord(sinkRecord.getTablePath(), (RowType) this.evolvingSerializationSchemaCache.getValueDataType(schemaId).getLogicalType(), schemaId, DynamicKafkaRecordSerializationSchema.createProjectedRow(row, rowKind, this.evolvingSerializationSchemaCache.getValueFieldGetters(schemaId))));
        } else if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
            serialize2 = null;
        } else {
            RowData createProjectedRow = DynamicKafkaRecordSerializationSchema.createProjectedRow(row, rowKind, this.evolvingSerializationSchemaCache.getValueFieldGetters(schemaId));
            createProjectedRow.setRowKind(RowKind.INSERT);
            serialize2 = this.valueSerializationSchema.serialize(generateSchemaAwareSinkRecord(sinkRecord.getTablePath(), (RowType) this.evolvingSerializationSchemaCache.getValueDataType(schemaId).getLogicalType(), schemaId, createProjectedRow));
        }
        return new ProducerRecord<>(this.topic, extractPartition(row, serialize, serialize2, kafkaSinkContext.getPartitionsForTopic(this.topic)), (Long) readMetadata(sinkRecord, KafkaDynamicSink.WritableMetadata.TIMESTAMP), serialize, serialize2, (Iterable) readMetadata(sinkRecord, KafkaDynamicSink.WritableMetadata.HEADERS));
    }

    @Override // org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
    public void open(SerializationSchema.InitializationContext initializationContext, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) throws Exception {
        if (this.partitioner != null) {
            this.partitioner.open(kafkaSinkContext.getParallelInstanceId(), kafkaSinkContext.getNumberOfParallelInstances());
        }
        initSchemaClient(kafkaSinkContext);
    }

    private void initSchemaClient(KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) {
        if (this.schemaClient == null) {
            Preconditions.checkState(kafkaSinkContext.getRuntimeContext().isPresent(), "Cannot get runtime context from kafka sink context.");
            this.schemaClient = SchemaClient.of(kafkaSinkContext.getRuntimeContext().get());
        }
    }

    private void initSerializationCacheEntry(SinkRecord sinkRecord) {
        int schemaId = sinkRecord.getSchemaId();
        if (this.evolvingSerializationSchemaCache.contains(schemaId)) {
            return;
        }
        try {
            DataType rowDataType = this.schemaClient.getSchemaSpec(sinkRecord.getTablePath(), schemaId).toRowDataType();
            int[] createKeyFormatProjection = KafkaConnectorOptionsUtil.createKeyFormatProjection(this.keySerializationSchema != null, this.keyFields, this.keyPrefix, rowDataType, true);
            int[] createValueFormatProjection = KafkaConnectorOptionsUtil.createValueFormatProjection(this.valueFieldsStrategy, this.valuePrefix, this.keySerializationSchema != null, this.keyFields, this.keyPrefix, rowDataType, true);
            this.evolvingSerializationSchemaCache.addCache(schemaId, Projection.of(createValueFormatProjection).project(rowDataType), getFieldGettersFromDataType(rowDataType, createValueFormatProjection), this.keySerializationSchema != null ? getFieldGettersFromDataType(rowDataType, createKeyFormatProjection) : null, this.hasMetadata ? getMetadataPositions(rowDataType) : null);
        } catch (Exception e) {
            String format = String.format("Fail to create the serializer for the schema %d.", Integer.valueOf(schemaId));
            LOG.error(format, e);
            throw new IllegalStateException(format, e);
        }
    }

    private Integer extractPartition(RowData rowData, @Nullable byte[] bArr, byte[] bArr2, int[] iArr) {
        if (this.partitioner != null) {
            return Integer.valueOf(this.partitioner.partition(rowData, bArr, bArr2, this.topic, iArr));
        }
        return null;
    }

    @VisibleForTesting
    protected EvolvingSerializationSchemaCache getEvolvingSerializationSchemaCache() {
        return this.evolvingSerializationSchemaCache;
    }

    private <T> T readMetadata(SinkRecord sinkRecord, KafkaDynamicSink.WritableMetadata writableMetadata) {
        int i;
        if (this.hasMetadata && (i = this.evolvingSerializationSchemaCache.getMetadataPosition(sinkRecord.getSchemaId())[writableMetadata.ordinal()]) >= 0) {
            return (T) writableMetadata.converter.read(sinkRecord.getRow(), i);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static RowData.FieldGetter[] getFieldGettersFromDataType(DataType dataType, int[] iArr) {
        return KafkaDynamicSink.getFieldGetters(dataType.getLogicalType().getChildren(), iArr);
    }

    private int[] getMetadataPositions(DataType dataType) {
        List children = dataType.getLogicalType().getChildren();
        return Stream.of((Object[]) KafkaDynamicSink.WritableMetadata.values()).mapToInt(writableMetadata -> {
            int indexOf = this.metadataKeys.indexOf(writableMetadata.key);
            if (indexOf < 0) {
                return -1;
            }
            return children.size() + indexOf;
        }).toArray();
    }

    private SchemaAwareSinkRecord generateSchemaAwareSinkRecord(ObjectPath objectPath, RowType rowType, int i, RowData rowData) {
        return new SchemaAwareSinkRecord(objectPath, SchemaSpec.newBuilder().fromRowType(rowType).build(), i, rowData);
    }
}
