package org.apache.flink.connector.kafka.sink;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Properties;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.class */
public class KafkaSinkBuilder<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkBuilder.class);
    private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1);
    private static final int MAXIMUM_PREFIX_BYTES = 64000;
    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
    private String transactionalIdPrefix = "kafka-sink";
    private Properties kafkaProducerConfig;
    private KafkaRecordSerializationSchema<IN> recordSerializer;
    private String bootstrapServers;

    public KafkaSinkBuilder<IN> setDeliverGuarantee(DeliveryGuarantee deliveryGuarantee) {
        this.deliveryGuarantee = (DeliveryGuarantee) Preconditions.checkNotNull(deliveryGuarantee, "deliveryGuarantee");
        return this;
    }

    public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties properties) {
        this.kafkaProducerConfig = (Properties) Preconditions.checkNotNull(properties, "kafkaProducerConfig");
        if (properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        } else {
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        }
        if (properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
            LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        } else {
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        }
        if (!properties.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
            long millis = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis();
            Preconditions.checkState(millis < 2147483647L && millis > 0, "timeout does not fit into 32 bit integer");
            properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Integer.valueOf((int) millis));
            LOG.warn("Property [{}] not specified. Setting it to {}", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
        }
        return this;
    }

    public KafkaSinkBuilder<IN> setRecordSerializer(KafkaRecordSerializationSchema<IN> kafkaRecordSerializationSchema) {
        this.recordSerializer = (KafkaRecordSerializationSchema) Preconditions.checkNotNull(kafkaRecordSerializationSchema, "recordSerializer");
        ClosureCleaner.clean(this.recordSerializer, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        return this;
    }

    public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String str) {
        this.transactionalIdPrefix = (String) Preconditions.checkNotNull(str, "transactionalIdPrefix");
        Preconditions.checkState(str.getBytes(StandardCharsets.UTF_8).length <= MAXIMUM_PREFIX_BYTES, "The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
        return this;
    }

    public KafkaSinkBuilder<IN> setBootstrapServers(String str) {
        this.bootstrapServers = (String) Preconditions.checkNotNull(str);
        return this;
    }

    public KafkaSink<IN> build() {
        if (this.kafkaProducerConfig == null) {
            setKafkaProducerConfig(new Properties());
        }
        Preconditions.checkNotNull(this.bootstrapServers);
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            Preconditions.checkState(this.transactionalIdPrefix != null, "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
        }
        this.kafkaProducerConfig.put("bootstrap.servers", this.bootstrapServers);
        return new KafkaSink<>(this.deliveryGuarantee, this.kafkaProducerConfig, this.transactionalIdPrefix, (KafkaRecordSerializationSchema) Preconditions.checkNotNull(this.recordSerializer, "recordSerializer"));
    }
}
