package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.class */
public abstract class MessageAcknowledgingSourceBase<Type, UId> extends RichSourceFunction<Type> implements CheckpointedFunction, CheckpointListener {
    private static final long serialVersionUID = -8689291992192955579L;
    private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class);
    private final TypeSerializer<UId> idSerializer;
    private transient Set<UId> idsForCurrentCheckpoint;
    protected transient ArrayDeque<Tuple2<Long, Set<UId>>> pendingCheckpoints;
    private transient Set<UId> idsProcessedButNotAcknowledged;
    private transient ListState<SerializedCheckpointData[]> checkpointedState;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageAcknowledgingSourceBase(Class<UId> cls) {
        this(TypeExtractor.getForClass(cls));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageAcknowledgingSourceBase(TypeInformation<UId> typeInformation) {
        this.idSerializer = typeInformation.createSerializer(new ExecutionConfig());
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getSerializableListState("message-acknowledging-source-state");
        this.idsForCurrentCheckpoint = new HashSet(64);
        this.pendingCheckpoints = new ArrayDeque<>();
        this.idsProcessedButNotAcknowledged = new HashSet();
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No state to restore for the {}.", getClass().getSimpleName());
            return;
        }
        LOG.info("Restoring state for the {}.", getClass().getSimpleName());
        ArrayList arrayList = new ArrayList();
        Iterator it = ((Iterable) this.checkpointedState.get()).iterator();
        while (it.hasNext()) {
            arrayList.add((SerializedCheckpointData[]) it.next());
        }
        Preconditions.checkArgument(arrayList.size() == 1, getClass().getSimpleName() + " retrieved invalid state.");
        this.pendingCheckpoints = SerializedCheckpointData.toDeque((SerializedCheckpointData[]) arrayList.get(0), this.idSerializer);
        Iterator<Tuple2<Long, Set<UId>>> it2 = this.pendingCheckpoints.iterator();
        while (it2.hasNext()) {
            this.idsProcessedButNotAcknowledged.addAll((Collection) it2.next().f1);
        }
    }

    public void close() throws Exception {
        this.idsForCurrentCheckpoint.clear();
        this.pendingCheckpoints.clear();
    }

    protected abstract void acknowledgeIDs(long j, Set<UId> set);

    protected boolean addId(UId uid) {
        this.idsForCurrentCheckpoint.add(uid);
        return this.idsProcessedButNotAcknowledged.add(uid);
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " has not been properly initialized.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Checkpointing: Messages: {}, checkpoint id: {}, timestamp: {}", new Object[]{this.idsForCurrentCheckpoint, Long.valueOf(functionSnapshotContext.getCheckpointId()), Long.valueOf(functionSnapshotContext.getCheckpointTimestamp())});
        }
        this.pendingCheckpoints.addLast(new Tuple2<>(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.idsForCurrentCheckpoint));
        this.idsForCurrentCheckpoint = new HashSet(64);
        this.checkpointedState.clear();
        this.checkpointedState.add(SerializedCheckpointData.fromDeque(this.pendingCheckpoints, this.idSerializer));
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LOG.debug("Committing Messages externally for checkpoint {}", Long.valueOf(j));
        Iterator<Tuple2<Long, Set<UId>>> it = this.pendingCheckpoints.iterator();
        while (it.hasNext()) {
            Tuple2<Long, Set<UId>> next = it.next();
            if (((Long) next.f0).longValue() > j) {
                return;
            }
            LOG.trace("Committing Messages with following IDs {}", next.f1);
            acknowledgeIDs(j, (Set) next.f1);
            this.idsProcessedButNotAcknowledged.removeAll((Collection) next.f1);
            it.remove();
        }
    }
}
