package org.apache.flink.connector.kafka.source.enumerator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.class */
public class KafkaSourceEnumerator implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceEnumerator.class);
    private final KafkaSubscriber subscriber;
    private final OffsetsInitializer startingOffsetInitializer;
    private final OffsetsInitializer stoppingOffsetInitializer;
    private final Properties properties;
    private final long partitionDiscoveryIntervalMs;
    private final SplitEnumeratorContext<KafkaPartitionSplit> context;
    private final Boundedness boundedness;
    private final Set<TopicPartition> assignedPartitions;
    private final Map<Integer, Set<KafkaPartitionSplit>> pendingPartitionSplitAssignment;
    private final String consumerGroupId;
    private AdminClient adminClient;
    private boolean noMoreNewPartitionSplits;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator$PartitionChange.class */
    public static class PartitionChange {
        private final Set<TopicPartition> newPartitions;
        private final Set<TopicPartition> removedPartitions;

        PartitionChange(Set<TopicPartition> set, Set<TopicPartition> set2) {
            this.newPartitions = set;
            this.removedPartitions = set2;
        }

        public Set<TopicPartition> getNewPartitions() {
            return this.newPartitions;
        }

        public Set<TopicPartition> getRemovedPartitions() {
            return this.removedPartitions;
        }

        public boolean isEmpty() {
            return this.newPartitions.isEmpty() && this.removedPartitions.isEmpty();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.class */
    public static class PartitionOffsetsRetrieverImpl implements OffsetsInitializer.PartitionOffsetsRetriever, AutoCloseable {
        private final AdminClient adminClient;
        private final String groupId;

        public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String str) {
            this.adminClient = adminClient;
            this.groupId = str;
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> collection) {
            try {
                return (Map) this.adminClient.listConsumerGroupOffsets(this.groupId, new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList(collection))).partitionsToOffsetAndMetadata().thenApply(map -> {
                    HashMap hashMap = new HashMap();
                    map.forEach((topicPartition, offsetAndMetadata) -> {
                        if (offsetAndMetadata != null) {
                            hashMap.put(topicPartition, Long.valueOf(offsetAndMetadata.offset()));
                        }
                    });
                    return hashMap;
                }).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlinkRuntimeException("Interrupted while listing offsets for consumer group " + this.groupId, e);
            } catch (ExecutionException e2) {
                throw new FlinkRuntimeException("Failed to fetch committed offsets for consumer group " + this.groupId + " due to", e2);
            }
        }

        private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets(Map<TopicPartition, OffsetSpec> map) {
            try {
                return (Map) this.adminClient.listOffsets(map).all().thenApply(map2 -> {
                    HashMap hashMap = new HashMap();
                    map2.forEach((topicPartition, listOffsetsResultInfo) -> {
                        if (listOffsetsResultInfo != null) {
                            hashMap.put(topicPartition, listOffsetsResultInfo);
                        }
                    });
                    return hashMap;
                }).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlinkRuntimeException("Interrupted while listing offsets for topic partitions: " + map, e);
            } catch (ExecutionException e2) {
                throw new FlinkRuntimeException("Failed to list offsets for topic partitions: " + map + " due to", e2);
            }
        }

        private Map<TopicPartition, Long> listOffsets(Collection<TopicPartition> collection, OffsetSpec offsetSpec) {
            return (Map) listOffsets((Map) collection.stream().collect(Collectors.toMap(topicPartition -> {
                return topicPartition;
            }, topicPartition2 -> {
                return offsetSpec;
            }))).entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
            }));
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
            return listOffsets(collection, OffsetSpec.latest());
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
            return listOffsets(collection, OffsetSpec.earliest());
        }

        @Override // org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.PartitionOffsetsRetriever
        public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
            return (Map) listOffsets((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return OffsetSpec.forTimestamp(((Long) entry.getValue()).longValue());
            }))).entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry2 -> {
                return new OffsetAndTimestamp(((ListOffsetsResult.ListOffsetsResultInfo) entry2.getValue()).offset(), ((ListOffsetsResult.ListOffsetsResultInfo) entry2.getValue()).timestamp(), ((ListOffsetsResult.ListOffsetsResultInfo) entry2.getValue()).leaderEpoch());
            }));
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.adminClient.close(Duration.ZERO);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator$PartitionSplitChange.class */
    public static class PartitionSplitChange {
        private final Set<KafkaPartitionSplit> newPartitionSplits;
        private final Set<TopicPartition> removedPartitions;

        private PartitionSplitChange(Set<KafkaPartitionSplit> set, Set<TopicPartition> set2) {
            this.newPartitionSplits = Collections.unmodifiableSet(set);
            this.removedPartitions = Collections.unmodifiableSet(set2);
        }
    }

    public KafkaSourceEnumerator(KafkaSubscriber kafkaSubscriber, OffsetsInitializer offsetsInitializer, OffsetsInitializer offsetsInitializer2, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext, Boundedness boundedness) {
        this(kafkaSubscriber, offsetsInitializer, offsetsInitializer2, properties, splitEnumeratorContext, boundedness, Collections.emptySet());
    }

    public KafkaSourceEnumerator(KafkaSubscriber kafkaSubscriber, OffsetsInitializer offsetsInitializer, OffsetsInitializer offsetsInitializer2, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext, Boundedness boundedness, Set<TopicPartition> set) {
        this.noMoreNewPartitionSplits = false;
        this.subscriber = kafkaSubscriber;
        this.startingOffsetInitializer = offsetsInitializer;
        this.stoppingOffsetInitializer = offsetsInitializer2;
        this.properties = properties;
        this.context = splitEnumeratorContext;
        this.boundedness = boundedness;
        this.assignedPartitions = new HashSet(set);
        this.pendingPartitionSplitAssignment = new HashMap();
        this.partitionDiscoveryIntervalMs = ((Long) KafkaSourceOptions.getOption(properties, KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS, Long::parseLong)).longValue();
        this.consumerGroupId = properties.getProperty("group.id");
    }

    public void start() {
        this.adminClient = getKafkaAdminClient();
        if (this.partitionDiscoveryIntervalMs > 0) {
            LOG.info("Starting the KafkaSourceEnumerator for consumer group {} with partition discovery interval of {} ms.", this.consumerGroupId, Long.valueOf(this.partitionDiscoveryIntervalMs));
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0L, this.partitionDiscoveryIntervalMs);
        } else {
            LOG.info("Starting the KafkaSourceEnumerator for consumer group {} without periodic partition discovery.", this.consumerGroupId);
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<KafkaPartitionSplit> list, int i) {
        addPartitionSplitChangeToPendingAssignments(list);
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            assignPendingPartitionSplits(Collections.singleton(Integer.valueOf(i)));
        }
    }

    public void addReader(int i) {
        LOG.debug("Adding reader {} to KafkaSourceEnumerator for consumer group {}.", Integer.valueOf(i), this.consumerGroupId);
        assignPendingPartitionSplits(Collections.singleton(Integer.valueOf(i)));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public KafkaSourceEnumState m28snapshotState(long j) throws Exception {
        return new KafkaSourceEnumState(this.assignedPartitions);
    }

    public void close() {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    private Set<TopicPartition> getSubscribedTopicPartitions() {
        return this.subscriber.getSubscribedTopicPartitions(this.adminClient);
    }

    private void checkPartitionChanges(Set<TopicPartition> set, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to list subscribed topic partitions due to ", th);
        }
        PartitionChange partitionChange = getPartitionChange(set);
        if (partitionChange.isEmpty()) {
            return;
        }
        this.context.callAsync(() -> {
            return initializePartitionSplits(partitionChange);
        }, this::handlePartitionSplitChanges);
    }

    private PartitionSplitChange initializePartitionSplits(PartitionChange partitionChange) {
        Set<TopicPartition> unmodifiableSet = Collections.unmodifiableSet(partitionChange.getNewPartitions());
        OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = getOffsetsRetriever();
        Map<TopicPartition, Long> partitionOffsets = this.startingOffsetInitializer.getPartitionOffsets(unmodifiableSet, offsetsRetriever);
        Map<TopicPartition, Long> partitionOffsets2 = this.stoppingOffsetInitializer.getPartitionOffsets(unmodifiableSet, offsetsRetriever);
        HashSet hashSet = new HashSet(unmodifiableSet.size());
        for (TopicPartition topicPartition : unmodifiableSet) {
            Long l = partitionOffsets.get(topicPartition);
            hashSet.add(new KafkaPartitionSplit(topicPartition, l.longValue(), partitionOffsets2.getOrDefault(topicPartition, Long.MIN_VALUE).longValue()));
        }
        return new PartitionSplitChange(hashSet, partitionChange.getRemovedPartitions());
    }

    private void handlePartitionSplitChanges(PartitionSplitChange partitionSplitChange, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to initialize partition splits due to ", th);
        }
        if (this.partitionDiscoveryIntervalMs < 0) {
            LOG.debug("Partition discovery is disabled.");
            this.noMoreNewPartitionSplits = true;
        }
        addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
        assignPendingPartitionSplits(this.context.registeredReaders().keySet());
    }

    private void addPartitionSplitChangeToPendingAssignments(Collection<KafkaPartitionSplit> collection) {
        int currentParallelism = this.context.currentParallelism();
        for (KafkaPartitionSplit kafkaPartitionSplit : collection) {
            this.pendingPartitionSplitAssignment.computeIfAbsent(Integer.valueOf(getSplitOwner(kafkaPartitionSplit.getTopicPartition(), currentParallelism)), num -> {
                return new HashSet();
            }).add(kafkaPartitionSplit);
        }
        LOG.debug("Assigned {} to {} readers of consumer group {}.", new Object[]{collection, Integer.valueOf(currentParallelism), this.consumerGroupId});
    }

    private void assignPendingPartitionSplits(Set<Integer> set) {
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            checkReaderRegistered(intValue);
            Set<KafkaPartitionSplit> remove = this.pendingPartitionSplitAssignment.remove(Integer.valueOf(intValue));
            if (remove != null && !remove.isEmpty()) {
                ((List) hashMap.computeIfAbsent(Integer.valueOf(intValue), num -> {
                    return new ArrayList();
                })).addAll(remove);
                remove.forEach(kafkaPartitionSplit -> {
                    this.assignedPartitions.add(kafkaPartitionSplit.getTopicPartition());
                });
            }
        }
        if (!hashMap.isEmpty()) {
            LOG.info("Assigning splits to readers {}", hashMap);
            this.context.assignSplits(new SplitsAssignment(hashMap));
        }
        if (this.noMoreNewPartitionSplits && this.boundedness == Boundedness.BOUNDED) {
            LOG.debug("No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in consumer group {}.", set, this.consumerGroupId);
            SplitEnumeratorContext<KafkaPartitionSplit> splitEnumeratorContext = this.context;
            splitEnumeratorContext.getClass();
            set.forEach((v1) -> {
                r1.signalNoMoreSplits(v1);
            });
        }
    }

    private void checkReaderRegistered(int i) {
        if (!this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            throw new IllegalStateException(String.format("Reader %d is not registered to source coordinator", Integer.valueOf(i)));
        }
    }

    @VisibleForTesting
    PartitionChange getPartitionChange(Set<TopicPartition> set) {
        HashSet hashSet = new HashSet();
        Consumer consumer = topicPartition -> {
            if (set.remove(topicPartition)) {
                return;
            }
            hashSet.add(topicPartition);
        };
        this.assignedPartitions.forEach(consumer);
        this.pendingPartitionSplitAssignment.forEach((num, set2) -> {
            set2.forEach(kafkaPartitionSplit -> {
                consumer.accept(kafkaPartitionSplit.getTopicPartition());
            });
        });
        if (!set.isEmpty()) {
            LOG.info("Discovered new partitions: {}", set);
        }
        if (!hashSet.isEmpty()) {
            LOG.info("Discovered removed partitions: {}", hashSet);
        }
        return new PartitionChange(set, hashSet);
    }

    private AdminClient getKafkaAdminClient() {
        Properties properties = new Properties();
        deepCopyProperties(this.properties, properties);
        properties.setProperty("client.id", properties.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()) + "-enumerator-admin-client");
        return AdminClient.create(properties);
    }

    private OffsetsInitializer.PartitionOffsetsRetriever getOffsetsRetriever() {
        return new PartitionOffsetsRetrieverImpl(this.adminClient, this.properties.getProperty("group.id"));
    }

    @VisibleForTesting
    static int getSplitOwner(TopicPartition topicPartition, int i) {
        return ((((topicPartition.topic().hashCode() * 31) & Integer.MAX_VALUE) % i) + topicPartition.partition()) % i;
    }

    @VisibleForTesting
    static void deepCopyProperties(Properties properties, Properties properties2) {
        for (String str : properties.stringPropertyNames()) {
            properties2.setProperty(str, properties.getProperty(str));
        }
    }
}
