/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.util.SerializableFunction;

class TestingOperatorCoordinator
implements OperatorCoordinator {
    public static final byte[] NULL_RESTORE_VALUE = new byte[0];
    private final OperatorCoordinator.Context context;
    private final ArrayList<Integer> failedTasks = new ArrayList();
    private final ArrayList<SubtaskAndCheckpoint> restoredTasks = new ArrayList();
    private final CountDownLatch blockOnCloseLatch;
    @Nullable
    private byte[] lastRestoredCheckpointState;
    private long lastRestoredCheckpointId;
    private BlockingQueue<CompletableFuture<byte[]>> triggeredCheckpoints;
    private BlockingQueue<Long> lastCheckpointComplete;
    private BlockingQueue<OperatorEvent> receivedOperatorEvents;
    private boolean started;
    private boolean closed;

    public TestingOperatorCoordinator(OperatorCoordinator.Context context) {
        this(context, null);
    }

    public TestingOperatorCoordinator(OperatorCoordinator.Context context, CountDownLatch blockOnCloseLatch) {
        this.context = context;
        this.triggeredCheckpoints = new LinkedBlockingQueue<CompletableFuture<byte[]>>();
        this.lastCheckpointComplete = new LinkedBlockingQueue<Long>();
        this.receivedOperatorEvents = new LinkedBlockingQueue<OperatorEvent>();
        this.blockOnCloseLatch = blockOnCloseLatch;
    }

    public void start() throws Exception {
        this.started = true;
    }

    public void close() throws InterruptedException {
        this.closed = true;
        if (this.blockOnCloseLatch != null) {
            this.blockOnCloseLatch.await();
        }
    }

    public void handleEventFromOperator(int subtask, OperatorEvent event) {
        this.receivedOperatorEvents.add(event);
    }

    public void subtaskFailed(int subtask, @Nullable Throwable reason) {
        this.failedTasks.add(subtask);
    }

    public void subtaskReset(int subtask, long checkpointId) {
        this.restoredTasks.add(new SubtaskAndCheckpoint(subtask, checkpointId));
    }

    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        boolean added = this.triggeredCheckpoints.offer(result);
        assert (added);
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.lastCheckpointComplete.offer(checkpointId);
    }

    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) {
        this.lastRestoredCheckpointId = checkpointId;
        this.lastRestoredCheckpointState = checkpointData == null ? NULL_RESTORE_VALUE : checkpointData;
    }

    public OperatorCoordinator.Context getContext() {
        return this.context;
    }

    public boolean isStarted() {
        return this.started;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public List<Integer> getFailedTasks() {
        return this.failedTasks;
    }

    public List<SubtaskAndCheckpoint> getRestoredTasks() {
        return this.restoredTasks;
    }

    @Nullable
    public byte[] getLastRestoredCheckpointState() {
        return this.lastRestoredCheckpointState;
    }

    public long getLastRestoredCheckpointId() {
        return this.lastRestoredCheckpointId;
    }

    public CompletableFuture<byte[]> getLastTriggeredCheckpoint() throws InterruptedException {
        return this.triggeredCheckpoints.take();
    }

    public boolean hasTriggeredCheckpoint() {
        return !this.triggeredCheckpoints.isEmpty();
    }

    public long getLastCheckpointComplete() throws InterruptedException {
        return this.lastCheckpointComplete.take();
    }

    @Nullable
    public OperatorEvent getNextReceivedOperatorEvent() {
        return (OperatorEvent)this.receivedOperatorEvents.poll();
    }

    public boolean hasCompleteCheckpoint() throws InterruptedException {
        return !this.lastCheckpointComplete.isEmpty();
    }

    public static final class Provider
    implements OperatorCoordinator.Provider {
        private static final long serialVersionUID = 1L;
        private final OperatorID operatorId;
        private final SerializableFunction<OperatorCoordinator.Context, TestingOperatorCoordinator> factory;

        public Provider(OperatorID operatorId) {
            this(operatorId, TestingOperatorCoordinator::new);
        }

        public Provider(OperatorID operatorId, SerializableFunction<OperatorCoordinator.Context, TestingOperatorCoordinator> factory) {
            this.operatorId = operatorId;
            this.factory = factory;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return (OperatorCoordinator)this.factory.apply(context);
        }
    }

    public static final class SubtaskAndCheckpoint {
        public final int subtaskIndex;
        public final long checkpointId;

        public SubtaskAndCheckpoint(int subtaskIndex, long checkpointId) {
            this.subtaskIndex = subtaskIndex;
            this.checkpointId = checkpointId;
        }
    }
}

