/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.committer;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarCommitter
implements Committer<PulsarCommittable>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarCommitter.class);
    private final SinkConfiguration sinkConfiguration;
    private PulsarClient pulsarClient;
    private TransactionCoordinatorClient coordinatorClient;

    public PulsarCommitter(SinkConfiguration sinkConfiguration) {
        this.sinkConfiguration = (SinkConfiguration)((Object)Preconditions.checkNotNull((Object)((Object)sinkConfiguration)));
    }

    public void commit(Collection<Committer.CommitRequest<PulsarCommittable>> requests) throws PulsarClientException {
        TransactionCoordinatorClient client = this.transactionCoordinatorClient();
        for (Committer.CommitRequest<PulsarCommittable> request : requests) {
            PulsarCommittable committable = (PulsarCommittable)request.getCommittable();
            TxnID txnID = committable.getTxnID();
            String topic = committable.getTopic();
            LOG.debug("Start committing the Pulsar transaction {} for topic {}", (Object)txnID, (Object)topic);
            try {
                client.commit(txnID);
            }
            catch (TransactionCoordinatorClientException.CoordinatorNotFoundException e) {
                LOG.error("We couldn't find the Transaction Coordinator from Pulsar broker {}. Check your broker configuration.", (Object)committable, (Object)e);
                request.signalFailedWithKnownReason((Throwable)e);
            }
            catch (TransactionCoordinatorClientException.InvalidTxnStatusException e) {
                LOG.error("Unable to commit transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Pulsar broker logs for more details.", (Object)committable, (Object)e);
                request.signalAlreadyCommitted();
            }
            catch (TransactionCoordinatorClientException.TransactionNotFoundException e) {
                if (request.getNumberOfRetries() == 0) {
                    LOG.error("Unable to commit transaction ({}) because it's not found on Pulsar broker. Most likely the checkpoint interval exceed the transaction timeout.", (Object)committable, (Object)e);
                    request.signalFailedWithKnownReason((Throwable)e);
                    continue;
                }
                LOG.warn("We can't find the transaction {} after {} retry committing. This may mean that the transaction have been committed in previous but failed with timeout. So we just mark it as committed.", (Object)txnID, (Object)request.getNumberOfRetries());
                request.signalAlreadyCommitted();
            }
            catch (TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException e) {
                LOG.error("We can't find the meta store handler by the mostSigBits from TxnID {}. Did you change the metadata for topic {}?", new Object[]{committable, SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, e});
                request.signalFailedWithKnownReason((Throwable)e);
            }
            catch (TransactionCoordinatorClientException e) {
                LOG.error("Encountered retriable exception while committing transaction {} for topic {}.", new Object[]{committable, topic, e});
                int maxRecommitTimes = this.sinkConfiguration.getMaxRecommitTimes();
                if (request.getNumberOfRetries() < maxRecommitTimes) {
                    request.retryLater();
                    continue;
                }
                String message = String.format("Failed to commit transaction %s after retrying %d times", txnID, maxRecommitTimes);
                request.signalFailedWithKnownReason((Throwable)new FlinkRuntimeException(message, (Throwable)e));
            }
            catch (Exception e) {
                LOG.error("Transaction ({}) encountered unknown error and data could be potentially lost.", (Object)committable, (Object)e);
                request.signalFailedWithUnknownReason((Throwable)e);
            }
        }
    }

    private TransactionCoordinatorClient transactionCoordinatorClient() throws PulsarClientException {
        if (this.coordinatorClient == null) {
            this.pulsarClient = PulsarClientFactory.createClient(this.sinkConfiguration);
            this.coordinatorClient = PulsarTransactionUtils.getTcClient(this.pulsarClient);
        }
        return this.coordinatorClient;
    }

    @Override
    public void close() throws IOException {
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
    }
}

