/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.OptionalInt;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSerializerWrapper;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Serializer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class KafkaRecordSerializationSchemaBuilder<IN> {
    @Nullable
    private Function<? super IN, String> topicSelector;
    @Nullable
    private SerializationSchema<? super IN> valueSerializationSchema;
    @Nullable
    private FlinkKafkaPartitioner<? super IN> partitioner;
    @Nullable
    private SerializationSchema<? super IN> keySerializationSchema;

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(FlinkKafkaPartitioner<? super T> partitioner) {
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.partitioner = (FlinkKafkaPartitioner)Preconditions.checkNotNull(partitioner);
        return self;
    }

    public KafkaRecordSerializationSchemaBuilder<IN> setTopic(String topic) {
        Preconditions.checkState((this.topicSelector == null ? 1 : 0) != 0, (Object)"Topic selector already set.");
        Preconditions.checkNotNull((Object)topic);
        this.topicSelector = new CachingTopicSelector<IN>(e -> topic);
        return this;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setTopicSelector(TopicSelector<? super T> topicSelector) {
        Preconditions.checkState((this.topicSelector == null ? 1 : 0) != 0, (Object)"Topic selector already set.");
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.topicSelector = new CachingTopicSelector<IN>((TopicSelector)Preconditions.checkNotNull(topicSelector));
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setKeySerializationSchema(SerializationSchema<? super T> keySerializationSchema) {
        this.checkKeySerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.keySerializationSchema = (SerializationSchema)Preconditions.checkNotNull(keySerializationSchema);
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setKafkaKeySerializer(Class<? extends Serializer<? super T>> keySerializer) {
        this.checkKeySerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.keySerializationSchema = new KafkaSerializerWrapper<IN>(keySerializer, true, this.topicSelector);
        return self;
    }

    public <T extends IN, S extends Serializer<? super T>> KafkaRecordSerializationSchemaBuilder<T> setKafkaKeySerializer(Class<S> keySerializer, Map<String, String> configuration) {
        this.checkKeySerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.keySerializationSchema = new KafkaSerializerWrapper<IN>(keySerializer, true, configuration, this.topicSelector);
        return self;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setValueSerializationSchema(SerializationSchema<T> valueSerializationSchema) {
        this.checkValueSerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.valueSerializationSchema = (SerializationSchema)Preconditions.checkNotNull(valueSerializationSchema);
        return self;
    }

    private <T extends IN> KafkaRecordSerializationSchemaBuilder<T> self() {
        return this;
    }

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setKafkaValueSerializer(Class<? extends Serializer<? super T>> valueSerializer) {
        this.checkValueSerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.valueSerializationSchema = new KafkaSerializerWrapper<IN>(valueSerializer, false, this.topicSelector);
        return self;
    }

    public <T extends IN, S extends Serializer<? super T>> KafkaRecordSerializationSchemaBuilder<T> setKafkaValueSerializer(Class<S> valueSerializer, Map<String, String> configuration) {
        this.checkValueSerializerNotSet();
        KafkaRecordSerializationSchemaBuilder<T> self = this.self();
        self.valueSerializationSchema = new KafkaSerializerWrapper<IN>(valueSerializer, false, configuration, this.topicSelector);
        return self;
    }

    public KafkaRecordSerializationSchema<IN> build() {
        Preconditions.checkState((this.valueSerializationSchema != null ? 1 : 0) != 0, (Object)"No value serializer is configured.");
        Preconditions.checkState((this.topicSelector != null ? 1 : 0) != 0, (Object)"No topic selector is configured.");
        return new KafkaRecordSerializationSchemaWrapper<IN>(this.topicSelector, this.valueSerializationSchema, this.keySerializationSchema, this.partitioner);
    }

    private void checkValueSerializerNotSet() {
        Preconditions.checkState((this.valueSerializationSchema == null ? 1 : 0) != 0, (Object)"Value serializer already set.");
    }

    private void checkKeySerializerNotSet() {
        Preconditions.checkState((this.keySerializationSchema == null ? 1 : 0) != 0, (Object)"Key serializer already set.");
    }

    private static class KafkaRecordSerializationSchemaWrapper<IN>
    implements KafkaRecordSerializationSchema<IN> {
        private final SerializationSchema<? super IN> valueSerializationSchema;
        private final Function<? super IN, String> topicSelector;
        private final FlinkKafkaPartitioner<? super IN> partitioner;
        private final SerializationSchema<? super IN> keySerializationSchema;

        KafkaRecordSerializationSchemaWrapper(Function<? super IN, String> topicSelector, SerializationSchema<? super IN> valueSerializationSchema, @Nullable SerializationSchema<? super IN> keySerializationSchema, @Nullable FlinkKafkaPartitioner<? super IN> partitioner) {
            this.topicSelector = (Function)Preconditions.checkNotNull(topicSelector);
            this.valueSerializationSchema = (SerializationSchema)Preconditions.checkNotNull(valueSerializationSchema);
            this.partitioner = partitioner;
            this.keySerializationSchema = keySerializationSchema;
        }

        @Override
        public void open(SerializationSchema.InitializationContext context, KafkaRecordSerializationSchema.KafkaSinkContext sinkContext) throws Exception {
            this.valueSerializationSchema.open(context);
            if (this.keySerializationSchema != null) {
                this.keySerializationSchema.open(context);
            }
            if (this.partitioner != null) {
                this.partitioner.open(sinkContext.getParallelInstanceId(), sinkContext.getNumberOfParallelInstances());
            }
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(IN element, KafkaRecordSerializationSchema.KafkaSinkContext context, Long timestamp) {
            String targetTopic = this.topicSelector.apply(element);
            byte[] value = this.valueSerializationSchema.serialize(element);
            byte[] key = null;
            if (this.keySerializationSchema != null) {
                key = this.keySerializationSchema.serialize(element);
            }
            OptionalInt partition = this.partitioner != null ? OptionalInt.of(this.partitioner.partition(element, key, value, targetTopic, context.getPartitionsForTopic(targetTopic))) : OptionalInt.empty();
            return new ProducerRecord<byte[], byte[]>(targetTopic, partition.isPresent() ? Integer.valueOf(partition.getAsInt()) : null, timestamp == null || timestamp < 0L ? null : timestamp, key, value);
        }
    }

    private static class CachingTopicSelector<IN>
    implements Function<IN, String>,
    Serializable {
        private static final int CACHE_RESET_SIZE = 5;
        private final Map<IN, String> cache;
        private final TopicSelector<IN> topicSelector;

        CachingTopicSelector(TopicSelector<IN> topicSelector) {
            this.topicSelector = topicSelector;
            this.cache = new HashMap<IN, String>();
        }

        @Override
        public String apply(IN in) {
            String topic = this.cache.getOrDefault(in, (String)this.topicSelector.apply(in));
            this.cache.put(in, topic);
            if (this.cache.size() == 5) {
                this.cache.clear();
            }
            return topic;
        }
    }
}

