/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
import org.apache.pulsar.shade.com.google.common.cache.CacheLoader;
import org.apache.pulsar.shade.com.google.common.cache.LoadingCache;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MetadataListener
implements Serializable,
Closeable {
    private static final long serialVersionUID = 6186948471557507522L;
    private static final Logger LOG = LoggerFactory.getLogger(MetadataListener.class);
    private final ImmutableList<String> partitions;
    private final ImmutableList<String> topics;
    private ImmutableList<TopicPartition> availablePartitions;
    private transient PulsarAdmin pulsarAdmin;
    private transient Long topicMetadataRefreshInterval;
    private transient ProcessingTimeService timeService;
    private transient LoadingCache<String, Optional<Integer>> topicPartitionCache;

    public MetadataListener() {
        this(Collections.emptyList());
    }

    public MetadataListener(List<String> topics) {
        ImmutableList.Builder partitionsBuilder = ImmutableList.builder();
        ImmutableList.Builder topicsBuilder = ImmutableList.builder();
        for (String topic : topics) {
            if (TopicNameUtils.isPartition(topic)) {
                partitionsBuilder.add(topic);
                continue;
            }
            topicsBuilder.add(topic);
        }
        this.partitions = partitionsBuilder.build();
        this.topics = topicsBuilder.build();
        this.availablePartitions = ImmutableList.of();
    }

    public void open(SinkConfiguration sinkConfiguration, ProcessingTimeService timeService) throws PulsarClientException {
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sinkConfiguration);
        this.topicMetadataRefreshInterval = sinkConfiguration.getTopicMetadataRefreshInterval();
        this.timeService = timeService;
        this.topicPartitionCache = CacheBuilder.newBuilder().expireAfterWrite(this.topicMetadataRefreshInterval, TimeUnit.MILLISECONDS).build(new CacheLoader<String, Optional<Integer>>(){

            @Override
            @ParametersAreNonnullByDefault
            public Optional<Integer> load(String topic) throws PulsarAdminException {
                try {
                    PartitionedTopicMetadata metadata = MetadataListener.this.pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
                    return Optional.of(metadata.partitions);
                }
                catch (PulsarAdminException.NotFoundException e) {
                    return Optional.empty();
                }
            }
        });
        try {
            this.updateTopicMetadata();
        }
        catch (PulsarAdminException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        if (this.topics.isEmpty()) {
            LOG.info("No topics have been provided, skip metadata update timer.");
        } else {
            this.registerNextTopicMetadataUpdateTimer();
        }
    }

    public List<TopicPartition> availablePartitions() {
        return this.availablePartitions;
    }

    public Optional<TopicMetadata> queryTopicMetadata(String topic) throws PulsarAdminException {
        if (TopicNameUtils.isPartition(topic)) {
            return Optional.of(new TopicMetadata(topic, 0));
        }
        try {
            return this.topicPartitionCache.get(topic).map(size -> new TopicMetadata(topic, (int)size));
        }
        catch (ExecutionException e) {
            Optional optional = ExceptionUtils.findThrowable((Throwable)e, PulsarAdminException.class);
            if (optional.isPresent()) {
                throw (PulsarAdminException)optional.get();
            }
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    @VisibleForTesting
    void refreshTopicMetadata(String topic) {
        this.topicPartitionCache.refresh(topic);
    }

    @Override
    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private void registerNextTopicMetadataUpdateTimer() {
        long currentProcessingTime = this.timeService.getCurrentProcessingTime();
        long triggerTime = currentProcessingTime + this.topicMetadataRefreshInterval;
        this.timeService.registerTimer(triggerTime, time -> this.triggerNextTopicMetadataUpdate());
    }

    private void triggerNextTopicMetadataUpdate() {
        try {
            this.updateTopicMetadata();
        }
        catch (PulsarAdminException e) {
            LOG.warn("", (Throwable)e);
        }
        this.registerNextTopicMetadataUpdateTimer();
    }

    private void updateTopicMetadata() throws PulsarAdminException {
        ImmutableList.Builder parititonsBuilder = ImmutableList.builder();
        for (String topic : this.topics) {
            Optional<TopicMetadata> optionalMetadata = this.queryTopicMetadata(topic);
            if (!optionalMetadata.isPresent()) continue;
            TopicMetadata metadata = optionalMetadata.get();
            int partitionSize = metadata.getPartitionSize();
            if (metadata.isPartitioned()) {
                for (int i = 0; i < partitionSize; ++i) {
                    parititonsBuilder.add(new TopicPartition(topic, i));
                }
                continue;
            }
            parititonsBuilder.add(new TopicPartition(topic));
        }
        for (String partition : this.partitions) {
            TopicName topicName = TopicName.get(partition);
            String name = topicName.getPartitionedTopicName();
            int index = topicName.getPartitionIndex();
            parititonsBuilder.add(new TopicPartition(name, index));
        }
        this.availablePartitions = parititonsBuilder.build();
    }
}

