/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class CheckpointCoordinatorTestingUtils {
    public static OperatorStateHandle generatePartitionableStateHandle(JobVertexID jobVertexID, int index, int namedStates, int partitionsPerState, boolean rawState) throws IOException {
        HashMap<String, List<? extends Serializable>> statesListsMap = new HashMap<String, List<? extends Serializable>>(namedStates);
        for (int i = 0; i < namedStates; ++i) {
            ArrayList<Integer> testStatesLists = new ArrayList<Integer>(partitionsPerState);
            int seed = jobVertexID.hashCode() * index + i * namedStates;
            if (rawState) {
                seed = (seed + 1) * 31;
            }
            Random random = new Random(seed);
            for (int j = 0; j < partitionsPerState; ++j) {
                int simulatedStateValue = random.nextInt();
                testStatesLists.add(simulatedStateValue);
            }
            statesListsMap.put("state-" + i, testStatesLists);
        }
        return CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(statesListsMap);
    }

    static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(JobVertexID jobVertexID, int index, int namedStates, int partitionsPerState, boolean rawState) throws IOException {
        HashMap<String, List<? extends Serializable>> statesListsMap = new HashMap<String, List<? extends Serializable>>(namedStates);
        for (int i = 0; i < namedStates; ++i) {
            ArrayList<Integer> testStatesLists = new ArrayList<Integer>(partitionsPerState);
            int seed = jobVertexID.hashCode() * index + i * namedStates;
            if (rawState) {
                seed = (seed + 1) * 31;
            }
            Random random = new Random(seed);
            for (int j = 0; j < partitionsPerState; ++j) {
                int simulatedStateValue = random.nextInt();
                testStatesLists.add(simulatedStateValue);
            }
            statesListsMap.put("state-" + i, testStatesLists);
        }
        return ChainedStateHandle.wrapSingleHandle((StateObject)CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(statesListsMap));
    }

    static OperatorStateHandle generatePartitionableStateHandle(Map<String, List<? extends Serializable>> states) throws IOException {
        ArrayList<List<? extends Serializable>> namedStateSerializables = new ArrayList<List<? extends Serializable>>(states.size());
        for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
            namedStateSerializables.add(entry.getValue());
        }
        Tuple2<byte[], List<long[]>> serializationWithOffsets = CheckpointCoordinatorTestingUtils.serializeTogetherAndTrackOffsets(namedStateSerializables);
        HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(states.size());
        int idx = 0;
        for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
            offsetsMap.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo((long[])((List)serializationWithOffsets.f1).get(idx), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            ++idx;
        }
        return new OperatorStreamStateHandle(offsetsMap, (StreamStateHandle)CheckpointCoordinatorTestingUtils.generateByteStreamStateHandle((byte[])serializationWithOffsets.f0));
    }

    private static ByteStreamStateHandle generateByteStreamStateHandle(byte[] bytes) {
        return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), bytes);
    }

    static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(List<List<? extends Serializable>> serializables) throws IOException {
        ArrayList<long[]> offsets = new ArrayList<long[]>(serializables.size());
        ArrayList<byte[]> serializedGroupValues = new ArrayList<byte[]>();
        int runningGroupsOffset = 0;
        for (List<? extends Serializable> list : serializables) {
            long[] currentOffsets = new long[list.size()];
            offsets.add(currentOffsets);
            for (int i = 0; i < list.size(); ++i) {
                currentOffsets[i] = runningGroupsOffset;
                byte[] serializedValue = InstantiationUtil.serializeObject((Object)list.get(i));
                serializedGroupValues.add(serializedValue);
                runningGroupsOffset += serializedValue.length;
            }
        }
        byte[] allSerializedValuesConcatenated = new byte[runningGroupsOffset];
        runningGroupsOffset = 0;
        for (byte[] serializedGroupValue : serializedGroupValues) {
            System.arraycopy(serializedGroupValue, 0, allSerializedValuesConcatenated, runningGroupsOffset, serializedGroupValue.length);
            runningGroupsOffset += serializedGroupValue.length;
        }
        return new Tuple2((Object)allSerializedValuesConcatenated, offsets);
    }

    public static void verifyStateRestore(JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> keyGroupPartitions) throws Exception {
        for (int i = 0; i < executionJobVertex.getParallelism(); ++i) {
            JobManagerTaskRestore taskRestore = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
            Assert.assertEquals((long)1L, (long)taskRestore.getRestoreCheckpointId());
            TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
            OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID));
            ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend = CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
            Assert.assertTrue((boolean)CommonTestUtils.isStreamContentEqual((InputStream)((OperatorStateHandle)expectedOpStateBackend.get(0)).openInputStream(), (InputStream)((OperatorStateHandle)operatorState.getManagedOperatorState().iterator().next()).openInputStream()));
            KeyGroupsStateHandle expectPartitionedKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, keyGroupPartitions.get(i), false);
            CheckpointCoordinatorTestingUtils.compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), (Collection<? extends KeyedStateHandle>)operatorState.getManagedKeyedState());
        }
    }

    static void compareKeyedState(Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState, Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
        KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
        int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        int actualTotalKeyGroups = 0;
        for (KeyedStateHandle keyedStateHandle : actualPartitionedKeyGroupState) {
            Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
            actualTotalKeyGroups += keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        }
        Assert.assertEquals((long)expectedTotalKeyGroups, (long)actualTotalKeyGroups);
        Throwable throwable = null;
        try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream();){
            Iterator iterator = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int groupId = (Integer)iterator.next();
                long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                inputStream.seek(offset);
                int expectedKeyGroupState = (Integer)InstantiationUtil.deserializeObject((InputStream)inputStream, (ClassLoader)Thread.currentThread().getContextClassLoader());
                for (KeyedStateHandle keyedStateHandle : actualPartitionedKeyGroupState) {
                    Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
                    KeyGroupsStateHandle oneActualKeyGroupStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                    if (!oneActualKeyGroupStateHandle.getKeyGroupRange().contains(groupId)) continue;
                    long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                    FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream();
                    Throwable throwable2 = null;
                    try {
                        actualInputStream.seek(actualOffset);
                        int actualGroupState = (Integer)InstantiationUtil.deserializeObject((InputStream)actualInputStream, (ClassLoader)Thread.currentThread().getContextClassLoader());
                        Assert.assertEquals((long)expectedKeyGroupState, (long)actualGroupState);
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (actualInputStream == null) continue;
                        if (throwable2 != null) {
                            try {
                                actualInputStream.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        actualInputStream.close();
                    }
                }
            }
        }
        catch (Throwable throwable5) {
            Throwable throwable6 = throwable5;
            throw throwable5;
        }
    }

    static void comparePartitionableState(List<ChainedStateHandle<OperatorStateHandle>> expected, List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
        ArrayList<String> expectedResult = new ArrayList<String>();
        for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
            for (int i = 0; i < chainedStateHandle.getLength(); ++i) {
                OperatorStateHandle operatorStateHandle = (OperatorStateHandle)chainedStateHandle.get(i);
                CheckpointCoordinatorTestingUtils.collectResult(i, operatorStateHandle, expectedResult);
            }
        }
        Collections.sort(expectedResult);
        ArrayList<String> actualResult = new ArrayList<String>();
        for (List<Collection<OperatorStateHandle>> collectionList : actual) {
            if (collectionList == null) continue;
            for (int i = 0; i < collectionList.size(); ++i) {
                Collection<OperatorStateHandle> stateHandles = collectionList.get(i);
                Assert.assertNotNull(stateHandles);
                for (OperatorStateHandle operatorStateHandle : stateHandles) {
                    CheckpointCoordinatorTestingUtils.collectResult(i, operatorStateHandle, actualResult);
                }
            }
        }
        Collections.sort(actualResult);
        Assert.assertEquals(expectedResult, actualResult);
    }

    static void collectResult(int opIdx, OperatorStateHandle operatorStateHandle, List<String> resultCollector) throws Exception {
        try (FSDataInputStream in = operatorStateHandle.openInputStream();){
            for (Map.Entry entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                for (long offset : ((OperatorStateHandle.StateMetaInfo)entry.getValue()).getOffsets()) {
                    in.seek(offset);
                    Integer state = (Integer)InstantiationUtil.deserializeObject((InputStream)in, (ClassLoader)Thread.currentThread().getContextClassLoader());
                    resultCollector.add(opIdx + " : " + (String)entry.getKey() + " : " + state);
                }
            }
        }
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID jobVertexID, int parallelism, int maxParallelism) throws Exception {
        return CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID((JobVertexID)jobVertexID)), parallelism, maxParallelism);
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID jobVertexID, List<OperatorID> jobVertexIDs, int parallelism, int maxParallelism) throws Exception {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        ExecutionVertex[] executionVertices = new ExecutionVertex[parallelism];
        for (int i = 0; i < parallelism; ++i) {
            executionVertices[i] = CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID(), jobVertexID, jobVertexIDs, parallelism, maxParallelism, ExecutionState.RUNNING, new ExecutionState[0]);
            Mockito.when((Object)executionVertices[i].getParallelSubtaskIndex()).thenReturn((Object)i);
        }
        Mockito.when((Object)executionJobVertex.getJobVertexId()).thenReturn((Object)jobVertexID);
        Mockito.when((Object)executionJobVertex.getTaskVertices()).thenReturn((Object)executionVertices);
        Mockito.when((Object)executionJobVertex.getParallelism()).thenReturn((Object)parallelism);
        Mockito.when((Object)executionJobVertex.getMaxParallelism()).thenReturn((Object)maxParallelism);
        Mockito.when((Object)executionJobVertex.isMaxParallelismConfigured()).thenReturn((Object)true);
        ArrayList<OperatorIDPair> operatorIDPairs = new ArrayList<OperatorIDPair>();
        for (OperatorID operatorID : jobVertexIDs) {
            operatorIDPairs.add(OperatorIDPair.generatedIDOnly((OperatorID)operatorID));
        }
        Mockito.when((Object)executionJobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
        return executionJobVertex;
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
        return CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID, (LogicalSlot)null);
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer) {
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        taskManagerGateway.setCheckpointConsumer(checkpointConsumer);
        return CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID, taskManagerGateway);
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, TaskManagerGateway taskManagerGateway) {
        TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder();
        slotBuilder.setTaskManagerGateway(taskManagerGateway);
        TestingLogicalSlot slot = slotBuilder.createTestingLogicalSlot();
        return CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID, slot);
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, @Nullable LogicalSlot slot) {
        JobVertexID jobVertexID = new JobVertexID();
        return CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID, jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID((JobVertexID)jobVertexID)), slot, 1, 1, ExecutionState.RUNNING, new ExecutionState[0]);
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, JobVertexID jobVertexID, List<OperatorID> jobVertexIDs, int parallelism, int maxParallelism, ExecutionState state, ExecutionState ... successiveStates) {
        return CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID, jobVertexID, jobVertexIDs, null, parallelism, maxParallelism, state, successiveStates);
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, JobVertexID jobVertexID, List<OperatorID> jobVertexIDs, @Nullable LogicalSlot slot, int parallelism, int maxParallelism, ExecutionState state, ExecutionState ... successiveStates) {
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Execution exec = (Execution)Mockito.spy((Object)new Execution((Executor)Mockito.mock(Executor.class), vertex, 1, 1L, 1L, Time.milliseconds((long)500L)));
        if (slot != null) {
            Whitebox.setInternalState((Object)exec, (String)"assignedResource", (Object)slot);
        }
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)attemptID);
        Mockito.when((Object)exec.getState()).thenReturn((Object)state, (Object[])successiveStates);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)jobVertexID);
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        Mockito.when((Object)vertex.getTotalNumberOfParallelSubtasks()).thenReturn((Object)parallelism);
        Mockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)maxParallelism);
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        ArrayList<OperatorIDPair> operatorIDPairs = new ArrayList<OperatorIDPair>();
        for (OperatorID operatorID : jobVertexIDs) {
            operatorIDPairs.add(OperatorIDPair.generatedIDOnly((OperatorID)operatorID));
        }
        Mockito.when((Object)jobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
        Mockito.when((Object)vertex.getJobVertex()).thenReturn((Object)jobVertex);
        return vertex;
    }

    static TaskStateSnapshot mockSubtaskState(JobVertexID jobVertexID, int index, KeyGroupRange keyGroupRange) throws IOException {
        OperatorStateHandle partitionableState = CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false);
        KeyGroupsStateHandle partitionedKeyGroupState = CheckpointCoordinatorTestingUtils.generateKeyGroupState(jobVertexID, keyGroupRange, false);
        TaskStateSnapshot subtaskStates = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskState = (OperatorSubtaskState)Mockito.spy((Object)new OperatorSubtaskState(partitionableState, null, (KeyedStateHandle)partitionedKeyGroupState, null, null, null));
        subtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID), subtaskState);
        return subtaskStates;
    }

    public static KeyGroupsStateHandle generateKeyGroupState(JobVertexID jobVertexID, KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
        ArrayList<Integer> testStatesLists = new ArrayList<Integer>(keyGroupPartition.getNumberOfKeyGroups());
        Iterator iterator = keyGroupPartition.iterator();
        while (iterator.hasNext()) {
            int keyGroupIndex = (Integer)iterator.next();
            int vertexHash = jobVertexID.hashCode();
            int seed = rawState ? vertexHash * (31 + keyGroupIndex) : vertexHash + keyGroupIndex;
            Random random = new Random(seed);
            int simulatedStateValue = random.nextInt();
            testStatesLists.add(simulatedStateValue);
        }
        return CheckpointCoordinatorTestingUtils.generateKeyGroupState(keyGroupPartition, testStatesLists);
    }

    public static KeyGroupsStateHandle generateKeyGroupState(KeyGroupRange keyGroupRange, List<? extends Serializable> states) throws IOException {
        Preconditions.checkArgument((keyGroupRange.getNumberOfKeyGroups() == states.size() ? 1 : 0) != 0);
        Tuple2<byte[], List<long[]>> serializedDataWithOffsets = CheckpointCoordinatorTestingUtils.serializeTogetherAndTrackOffsets(Collections.singletonList(states));
        KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, (long[])((List)serializedDataWithOffsets.f1).get(0));
        ByteStreamStateHandle allSerializedStatesHandle = CheckpointCoordinatorTestingUtils.generateByteStreamStateHandle((byte[])serializedDataWithOffsets.f0);
        return new KeyGroupsStateHandle(keyGroupRangeOffsets, (StreamStateHandle)allSerializedStatesHandle);
    }

    static Execution mockExecution() {
        Execution mock = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)mock.getAttemptId()).thenReturn((Object)new ExecutionAttemptID());
        Mockito.when((Object)mock.getState()).thenReturn((Object)ExecutionState.RUNNING);
        return mock;
    }

    static Execution mockExecution(SimpleAckingTaskManagerGateway.CheckpointConsumer checkpointConsumer) {
        ExecutionVertex executionVertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        JobID jobId = new JobID();
        Mockito.when((Object)executionVertex.getJobId()).thenReturn((Object)jobId);
        Execution mock = (Execution)Mockito.mock(Execution.class);
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        Mockito.when((Object)mock.getAttemptId()).thenReturn((Object)executionAttemptID);
        Mockito.when((Object)mock.getState()).thenReturn((Object)ExecutionState.RUNNING);
        Mockito.when((Object)mock.getVertex()).thenReturn((Object)executionVertex);
        ((Execution)Mockito.doAnswer(invocation -> {
            Object[] args = invocation.getArguments();
            checkpointConsumer.accept(executionAttemptID, jobId, (Long)args[0], (Long)args[1], (CheckpointOptions)args[2]);
            return null;
        }).when((Object)mock)).triggerCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointOptions)ArgumentMatchers.any(CheckpointOptions.class));
        return mock;
    }

    static ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) {
        ExecutionVertex mock = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)mock.getJobvertexId()).thenReturn((Object)vertexId);
        Mockito.when((Object)mock.getParallelSubtaskIndex()).thenReturn((Object)subtask);
        Mockito.when((Object)mock.getCurrentExecutionAttempt()).thenReturn((Object)execution);
        Mockito.when((Object)mock.getTotalNumberOfParallelSubtasks()).thenReturn((Object)parallelism);
        Mockito.when((Object)mock.getMaxParallelism()).thenReturn((Object)parallelism);
        return mock;
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
        ExecutionJobVertex vertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)vertex.getParallelism()).thenReturn((Object)vertices.length);
        Mockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)vertices.length);
        Mockito.when((Object)vertex.getJobVertexId()).thenReturn((Object)id);
        Mockito.when((Object)vertex.getTaskVertices()).thenReturn((Object)vertices);
        Mockito.when((Object)vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)OperatorID.fromJobVertexID((JobVertexID)id))));
        for (ExecutionVertex v : vertices) {
            Mockito.when((Object)v.getJobVertex()).thenReturn((Object)vertex);
        }
        return vertex;
    }

    public static final class MockOperatorCoordinatorCheckpointContext
    implements OperatorCoordinatorCheckpointContext {
        private final BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator;
        private final Consumer<Long> onCallingAfterSourceBarrierInjection;
        private final OperatorID operatorID;
        private final List<Long> completedCheckpoints;
        private final List<Long> abortedCheckpoints;

        private MockOperatorCoordinatorCheckpointContext(BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator, Consumer<Long> onCallingAfterSourceBarrierInjection, OperatorID operatorID) {
            this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
            this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
            this.operatorID = operatorID;
            this.completedCheckpoints = new ArrayList<Long>();
            this.abortedCheckpoints = new ArrayList<Long>();
        }

        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            if (this.onCallingCheckpointCoordinator != null) {
                this.onCallingCheckpointCoordinator.accept(checkpointId, result);
            }
        }

        public void afterSourceBarrierInjection(long checkpointId) {
            if (this.onCallingAfterSourceBarrierInjection != null) {
                this.onCallingAfterSourceBarrierInjection.accept(checkpointId);
            }
        }

        public void abortCurrentTriggering() {
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.completedCheckpoints.add(checkpointId);
        }

        public void notifyCheckpointAborted(long checkpointId) {
            this.abortedCheckpoints.add(checkpointId);
        }

        public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        }

        public void subtaskReset(int subtask, long checkpointId) {
        }

        public OperatorID operatorId() {
            return this.operatorID;
        }

        public int maxParallelism() {
            return 1;
        }

        public int currentParallelism() {
            return 1;
        }

        public List<Long> getCompletedCheckpoints() {
            return this.completedCheckpoints;
        }

        public List<Long> getAbortedCheckpoints() {
            return this.abortedCheckpoints;
        }
    }

    public static final class MockOperatorCheckpointCoordinatorContextBuilder {
        private BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator = null;
        private Consumer<Long> onCallingAfterSourceBarrierInjection = null;
        private OperatorID operatorID = null;

        public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingCheckpointCoordinator(BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator) {
            this.onCallingCheckpointCoordinator = onCallingCheckpointCoordinator;
            return this;
        }

        public MockOperatorCheckpointCoordinatorContextBuilder setOnCallingAfterSourceBarrierInjection(Consumer<Long> onCallingAfterSourceBarrierInjection) {
            this.onCallingAfterSourceBarrierInjection = onCallingAfterSourceBarrierInjection;
            return this;
        }

        public MockOperatorCheckpointCoordinatorContextBuilder setOperatorID(OperatorID operatorID) {
            this.operatorID = operatorID;
            return this;
        }

        public MockOperatorCoordinatorCheckpointContext build() {
            return new MockOperatorCoordinatorCheckpointContext(this.onCallingCheckpointCoordinator, this.onCallingAfterSourceBarrierInjection, this.operatorID);
        }
    }

    public static final class StringSerializer
    implements SimpleVersionedSerializer<String> {
        static final int VERSION = 77;

        public int getVersion() {
            return 77;
        }

        public byte[] serialize(String checkpointData) throws IOException {
            return checkpointData.getBytes(StandardCharsets.UTF_8);
        }

        public String deserialize(int version, byte[] serialized) throws IOException {
            if (version != 77) {
                throw new IOException("version mismatch");
            }
            return new String(serialized, StandardCharsets.UTF_8);
        }
    }

    public static class CheckpointCoordinatorBuilder {
        private JobID jobId = new JobID();
        private CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build();
        private ExecutionVertex[] tasksToTrigger;
        private ExecutionVertex[] tasksToWaitFor;
        private ExecutionVertex[] tasksToCommitTo;
        private Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint = Collections.emptyList();
        private CheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        private CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        private StateBackend checkpointStateBackend = new MemoryStateBackend();
        private Executor ioExecutor = Executors.directExecutor();
        private ScheduledExecutor timer = new ManuallyTriggeredScheduledExecutor();
        private SharedStateRegistryFactory sharedStateRegistryFactory = SharedStateRegistry.DEFAULT_FACTORY;
        private CheckpointFailureManager failureManager = new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)NoOpFailJobCall.INSTANCE);

        public CheckpointCoordinatorBuilder() {
            ExecutionVertex vertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID());
            ExecutionVertex[] defaultVertices = new ExecutionVertex[]{vertex};
            this.tasksToTrigger = defaultVertices;
            this.tasksToWaitFor = defaultVertices;
            this.tasksToCommitTo = defaultVertices;
        }

        public CheckpointCoordinatorBuilder setJobId(JobID jobId) {
            this.jobId = jobId;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration) {
            this.checkpointCoordinatorConfiguration = checkpointCoordinatorConfiguration;
            return this;
        }

        public CheckpointCoordinatorBuilder setTasks(ExecutionVertex[] tasks) {
            this.tasksToTrigger = tasks;
            this.tasksToWaitFor = tasks;
            this.tasksToCommitTo = tasks;
            return this;
        }

        public CheckpointCoordinatorBuilder setTasksToTrigger(ExecutionVertex[] tasksToTrigger) {
            this.tasksToTrigger = tasksToTrigger;
            return this;
        }

        public CheckpointCoordinatorBuilder setTasksToWaitFor(ExecutionVertex[] tasksToWaitFor) {
            this.tasksToWaitFor = tasksToWaitFor;
            return this;
        }

        public CheckpointCoordinatorBuilder setTasksToCommitTo(ExecutionVertex[] tasksToCommitTo) {
            this.tasksToCommitTo = tasksToCommitTo;
            return this;
        }

        public CheckpointCoordinatorBuilder setCoordinatorsToCheckpoint(Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
            this.coordinatorsToCheckpoint = coordinatorsToCheckpoint;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointIDCounter(CheckpointIDCounter checkpointIDCounter) {
            this.checkpointIDCounter = checkpointIDCounter;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointFailureManager(CheckpointFailureManager checkpointFailureManager) {
            this.failureManager = checkpointFailureManager;
            return this;
        }

        public CheckpointCoordinatorBuilder setCompletedCheckpointStore(CompletedCheckpointStore completedCheckpointStore) {
            this.completedCheckpointStore = completedCheckpointStore;
            return this;
        }

        public CheckpointCoordinatorBuilder setCheckpointStateBackend(StateBackend checkpointStateBackend) {
            this.checkpointStateBackend = checkpointStateBackend;
            return this;
        }

        public CheckpointCoordinatorBuilder setIoExecutor(Executor ioExecutor) {
            this.ioExecutor = ioExecutor;
            return this;
        }

        public CheckpointCoordinatorBuilder setTimer(ScheduledExecutor timer) {
            this.timer = timer;
            return this;
        }

        public CheckpointCoordinatorBuilder setSharedStateRegistryFactory(SharedStateRegistryFactory sharedStateRegistryFactory) {
            this.sharedStateRegistryFactory = sharedStateRegistryFactory;
            return this;
        }

        public CheckpointCoordinatorBuilder setFailureManager(CheckpointFailureManager failureManager) {
            this.failureManager = failureManager;
            return this;
        }

        public CheckpointCoordinatorBuilder setStateBackEnd(StateBackend stateBackEnd) {
            this.checkpointStateBackend = stateBackEnd;
            return this;
        }

        public CheckpointCoordinator build() {
            return new CheckpointCoordinator(this.jobId, this.checkpointCoordinatorConfiguration, this.tasksToTrigger, this.tasksToWaitFor, this.tasksToCommitTo, this.coordinatorsToCheckpoint, this.checkpointIDCounter, this.completedCheckpointStore, this.checkpointStateBackend, this.ioExecutor, this.timer, this.sharedStateRegistryFactory, this.failureManager);
        }
    }
}

