package org.apache.flink.connector.pulsar.source.enumerator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.class */
public class PulsarSourceEnumerator implements SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
    private final PulsarAdmin pulsarAdmin;
    private final PulsarSubscriber subscriber;
    private final StartCursor startCursor;
    private final RangeGenerator rangeGenerator;
    private final Configuration configuration;
    private final SourceConfiguration sourceConfiguration;
    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
    private final SplitAssigner splitAssigner;

    public PulsarSourceEnumerator(PulsarSubscriber pulsarSubscriber, StartCursor startCursor, RangeGenerator rangeGenerator, Configuration configuration, SourceConfiguration sourceConfiguration, SplitEnumeratorContext<PulsarPartitionSplit> splitEnumeratorContext, SplitAssigner splitAssigner) {
        this.pulsarAdmin = PulsarConfigUtils.createAdmin(configuration);
        this.subscriber = pulsarSubscriber;
        this.startCursor = startCursor;
        this.rangeGenerator = rangeGenerator;
        this.configuration = configuration;
        this.sourceConfiguration = sourceConfiguration;
        this.context = splitEnumeratorContext;
        this.splitAssigner = splitAssigner;
    }

    public void start() {
        this.rangeGenerator.open(this.configuration, this.sourceConfiguration);
        if (this.sourceConfiguration.isEnablePartitionDiscovery()) {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} with partition discovery interval of {} ms.", this.sourceConfiguration.getSubscriptionDesc(), Long.valueOf(this.sourceConfiguration.getPartitionDiscoveryIntervalMs()));
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0L, this.sourceConfiguration.getPartitionDiscoveryIntervalMs());
        } else {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} without periodic partition discovery.", this.sourceConfiguration.getSubscriptionDesc());
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<PulsarPartitionSplit> list, int i) {
        this.splitAssigner.addSplitsBack(list, i);
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            assignPendingPartitionSplits(Collections.singletonList(Integer.valueOf(i)));
        }
    }

    public void addReader(int i) {
        LOG.debug("Adding reader {} to PulsarSourceEnumerator for subscription {}.", Integer.valueOf(i), this.sourceConfiguration.getSubscriptionDesc());
        assignPendingPartitionSplits(Collections.singletonList(Integer.valueOf(i)));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PulsarSourceEnumState m10snapshotState(long j) {
        return this.splitAssigner.snapshotState();
    }

    public void close() {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private Set<TopicPartition> getSubscribedTopicPartitions() {
        return this.subscriber.getSubscribedTopicPartitions(this.pulsarAdmin, this.rangeGenerator, this.context.currentParallelism());
    }

    private void checkPartitionChanges(Set<TopicPartition> set, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to list subscribed topic partitions due to ", th);
        }
        createSubscription(this.splitAssigner.registerTopicPartitions(set));
        assignPendingPartitionSplits(new ArrayList(this.context.registeredReaders().keySet()));
    }

    private void createSubscription(List<TopicPartition> list) {
        for (TopicPartition topicPartition : list) {
            String fullTopicName = topicPartition.getFullTopicName();
            String subscriptionName = this.sourceConfiguration.getSubscriptionName();
            if (!((List) PulsarExceptionUtils.sneakyAdmin(() -> {
                return this.pulsarAdmin.topics().getSubscriptions(fullTopicName);
            })).contains(subscriptionName)) {
                MessageId queryInitialPosition = queryInitialPosition(fullTopicName, this.startCursor.position(topicPartition.getTopic(), topicPartition.getPartitionId()));
                PulsarExceptionUtils.sneakyAdmin(() -> {
                    this.pulsarAdmin.topics().createSubscription(fullTopicName, subscriptionName, queryInitialPosition);
                });
            }
        }
    }

    private MessageId queryInitialPosition(String str, CursorPosition cursorPosition) {
        CursorPosition.Type type = cursorPosition.getType();
        if (type == CursorPosition.Type.TIMESTAMP) {
            return (MessageId) PulsarExceptionUtils.sneakyAdmin(() -> {
                return this.pulsarAdmin.topics().getMessageIdByTimestamp(str, cursorPosition.getTimestamp().longValue());
            });
        }
        if (type == CursorPosition.Type.MESSAGE_ID) {
            return cursorPosition.getMessageId();
        }
        throw new UnsupportedOperationException("We don't support this seek type " + type);
    }

    private void assignPendingPartitionSplits(List<Integer> list) {
        list.forEach(num -> {
            if (!this.context.registeredReaders().containsKey(num)) {
                throw new IllegalStateException("Reader " + num + " is not registered to source coordinator");
            }
        });
        Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment = this.splitAssigner.createAssignment(list);
        SplitEnumeratorContext<PulsarPartitionSplit> splitEnumeratorContext = this.context;
        splitEnumeratorContext.getClass();
        createAssignment.ifPresent(splitEnumeratorContext::assignSplits);
        for (Integer num2 : list) {
            if (this.splitAssigner.noMoreSplits(num2)) {
                LOG.debug("No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in subscription {}.", num2, this.sourceConfiguration.getSubscriptionDesc());
                this.context.signalNoMoreSplits(num2.intValue());
            }
        }
    }
}
