package org.apache.flink.connector.pulsar.source.reader;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.class */
public final class PulsarSourceReaderFactory {
    private PulsarSourceReaderFactory() {
    }

    public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(SourceReaderContext sourceReaderContext, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema, Configuration configuration, SourceConfiguration sourceConfiguration) {
        PulsarClientImpl createClient = PulsarConfigUtils.createClient(configuration);
        PulsarAdmin createAdmin = PulsarConfigUtils.createAdmin(configuration);
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue(configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
        if (subscriptionType == SubscriptionType.Failover || subscriptionType == SubscriptionType.Exclusive) {
            return new PulsarOrderedSourceReader(futureCompletingBlockingQueue, () -> {
                return new PulsarOrderedPartitionSplitReader(createClient, createAdmin, configuration, sourceConfiguration, pulsarDeserializationSchema);
            }, configuration, sourceReaderContext, sourceConfiguration, createClient, createAdmin);
        }
        if (subscriptionType != SubscriptionType.Shared && subscriptionType != SubscriptionType.Key_Shared) {
            throw new UnsupportedOperationException("This subscription type is not " + subscriptionType + " supported currently.");
        }
        TransactionCoordinatorClientImpl tcClient = createClient.getTcClient();
        if (tcClient != null || sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            return new PulsarUnorderedSourceReader(futureCompletingBlockingQueue, () -> {
                return new PulsarUnorderedPartitionSplitReader(createClient, createAdmin, configuration, sourceConfiguration, pulsarDeserializationSchema, tcClient);
            }, configuration, sourceReaderContext, sourceConfiguration, createClient, createAdmin, tcClient);
        }
        throw new IllegalStateException("Transaction is required but didn't enabled");
    }
}
