/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KafkaSourceBuilder<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class);
    private static final String[] REQUIRED_CONFIGS = new String[]{"bootstrap.servers"};
    private KafkaSubscriber subscriber = null;
    private OffsetsInitializer startingOffsetsInitializer = OffsetsInitializer.earliest();
    private OffsetsInitializer stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
    private KafkaRecordDeserializationSchema<OUT> deserializationSchema = null;
    protected Properties props = new Properties();

    KafkaSourceBuilder() {
    }

    public KafkaSourceBuilder<OUT> setBootstrapServers(String bootstrapServers) {
        return this.setProperty("bootstrap.servers", bootstrapServers);
    }

    public KafkaSourceBuilder<OUT> setGroupId(String groupId) {
        return this.setProperty("group.id", groupId);
    }

    public KafkaSourceBuilder<OUT> setTopics(List<String> topics) {
        this.ensureSubscriberIsNull("topics");
        this.subscriber = KafkaSubscriber.getTopicListSubscriber(topics);
        return this;
    }

    public KafkaSourceBuilder<OUT> setTopics(String ... topics) {
        return this.setTopics(Arrays.asList(topics));
    }

    public KafkaSourceBuilder<OUT> setTopicPattern(Pattern topicPattern) {
        this.ensureSubscriberIsNull("topic pattern");
        this.subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern);
        return this;
    }

    public KafkaSourceBuilder<OUT> setPartitions(Set<TopicPartition> partitions) {
        this.ensureSubscriberIsNull("partitions");
        this.subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions);
        return this;
    }

    public KafkaSourceBuilder<OUT> setStartingOffsets(OffsetsInitializer startingOffsetsInitializer) {
        this.startingOffsetsInitializer = startingOffsetsInitializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) {
        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
        this.boundedness = Boundedness.BOUNDED;
        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setDeserializer(KafkaRecordDeserializationSchema<OUT> recordDeserializer) {
        this.deserializationSchema = recordDeserializer;
        return this;
    }

    public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(DeserializationSchema<OUT> deserializationSchema) {
        this.deserializationSchema = KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
        return this;
    }

    public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) {
        return this.setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
    }

    public KafkaSourceBuilder<OUT> setProperty(String key, String value) {
        this.props.setProperty(key, value);
        return this;
    }

    public KafkaSourceBuilder<OUT> setProperties(Properties props) {
        this.props.putAll((Map<?, ?>)props);
        return this;
    }

    public KafkaSource<OUT> build() {
        this.sanityCheck();
        this.parseAndSetRequiredProperties();
        return new KafkaSource<OUT>(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.boundedness, this.deserializationSchema, this.props);
    }

    private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
        if (this.subscriber != null) {
            throw new IllegalStateException(String.format("Cannot use %s for consumption because a %s is already set for consumption.", attemptingSubscribeMode, this.subscriber.getClass().getSimpleName()));
        }
    }

    private void parseAndSetRequiredProperties() {
        this.maybeOverride("key.deserializer", ByteArrayDeserializer.class.getName(), true);
        this.maybeOverride("value.deserializer", ByteArrayDeserializer.class.getName(), true);
        if (!this.props.containsKey("group.id")) {
            LOG.warn("Offset commit on checkpoint is disabled because {} is not specified", (Object)"group.id");
            this.maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
        }
        this.maybeOverride("enable.auto.commit", "false", false);
        this.maybeOverride("auto.offset.reset", this.startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true);
        this.maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", this.boundedness == Boundedness.BOUNDED);
        this.maybeOverride(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), this.props.containsKey("group.id") ? this.props.getProperty("group.id") : "KafkaSource-" + new Random().nextLong(), false);
    }

    private boolean maybeOverride(String key, String value, boolean override) {
        boolean overridden = false;
        String userValue = this.props.getProperty(key);
        if (userValue != null) {
            if (override) {
                LOG.warn(String.format("Property %s is provided but will be overridden from %s to %s", key, userValue, value));
                this.props.setProperty(key, value);
                overridden = true;
            }
        } else {
            this.props.setProperty(key, value);
        }
        return overridden;
    }

    private void sanityCheck() {
        for (String requiredConfig : REQUIRED_CONFIGS) {
            Preconditions.checkNotNull((Object)this.props.getProperty(requiredConfig), (String)String.format("Property %s is required but not provided", requiredConfig));
        }
        Preconditions.checkNotNull((Object)this.subscriber, (String)"No subscribe mode is specified, should be one of topics, topic pattern and partition set.");
        Preconditions.checkNotNull(this.deserializationSchema, (String)"Deserialization schema is required but not provided.");
        Preconditions.checkState((this.props.containsKey("group.id") || !this.offsetCommitEnabledManually() ? 1 : 0) != 0, (Object)String.format("Property %s is required when offset commit is enabled", "group.id"));
        if (this.startingOffsetsInitializer instanceof OffsetsInitializerValidator) {
            ((OffsetsInitializerValidator)((Object)this.startingOffsetsInitializer)).validate(this.props);
        }
        if (this.stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
            ((OffsetsInitializerValidator)((Object)this.stoppingOffsetsInitializer)).validate(this.props);
        }
    }

    private boolean offsetCommitEnabledManually() {
        boolean autoCommit = this.props.containsKey("enable.auto.commit") && Boolean.parseBoolean(this.props.getProperty("enable.auto.commit"));
        boolean commitOnCheckpoint = this.props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) && Boolean.parseBoolean(this.props.getProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
        return autoCommit || commitOnCheckpoint;
    }
}

