/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorTest
extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Before
    public void setUp() throws Exception {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMinCheckpointPause() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        try {
            int pause = 1000;
            JobID jobId = new JobID();
            ExecutionAttemptID attemptId = new ExecutionAttemptID();
            ExecutionVertex vertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptId);
            CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)new ScheduledExecutorServiceAdapter(executorService)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval((long)pause).setCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(1).setMinPauseBetweenCheckpoints((long)pause).build()).setTasksToTrigger(new ExecutionVertex[]{vertex}).setTasksToWaitFor(new ExecutionVertex[]{vertex}).setTasksToCommitTo(new ExecutionVertex[]{vertex}).setJobId(jobId).build();
            coordinator.startCheckpointScheduler();
            coordinator.triggerCheckpoint(true);
            coordinator.triggerCheckpoint(true);
            while (coordinator.getNumberOfPendingCheckpoints() == 0) {
                Thread.sleep(10L);
            }
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
            Thread.sleep(pause / 2);
            Assert.assertEquals((long)0L, (long)coordinator.getNumberOfPendingCheckpoints());
            Thread.sleep(pause);
            Assert.assertEquals((long)1L, (long)coordinator.getNumberOfPendingCheckpoints());
        }
        finally {
            executorService.shutdownNow();
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
        try {
            CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() {
        try {
            CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
        try {
            CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() {
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
        String errorMsg = "Exceeded checkpoint failure tolerance number!";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(jobId, vertex1, vertex2, checkpointFailureManager);
        try {
            CompletableFuture checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkPointFuture);
            long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
            Assert.fail((String)"Test failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof RuntimeException));
            Assert.assertEquals((Object)"Exceeded checkpoint failure tolerance number!", (Object)e.getMessage());
        }
        finally {
            try {
                checkpointCoordinator.shutdown(JobStatus.FINISHED);
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID());
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID());
        String errorMsg = "Exceeded checkpoint failure tolerance number!";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(new JobID(), vertex1, vertex2, checkpointFailureManager);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
            Assert.fail((String)"Test failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof RuntimeException));
            Assert.assertEquals((Object)"Exceeded checkpoint failure tolerance number!", (Object)e.getMessage());
        }
        finally {
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testTriggerAndDeclineSyncCheckpointFailureSimple() {
        this.testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_DECLINED);
    }

    @Test
    public void testTriggerAndDeclineAsyncCheckpointFailureSimple() {
        this.testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
    }

    private void testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason checkpointFailureReason) {
        try {
            CheckpointException checkpointException = new CheckpointException(checkpointFailureReason);
            JobID jobId = new JobID();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
            TestFailJobCallback failJobCallback = new TestFailJobCallback();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(new ExecutionVertex[]{vertex1, vertex2}).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCheckpointFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failJobCallback)).build();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)1L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
            Assert.assertNotNull((Object)checkpoint);
            Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
            Assert.assertEquals((Object)jobId, (Object)checkpoint.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt())).triggerCheckpoint(checkpointId, checkpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt())).triggerCheckpoint(checkpointId, checkpoint.getCheckpointTimestamp(), CheckpointOptions.forCheckpointWithDefaultLocation());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)1L, (long)failJobCallback.getInvokeCounter());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointComplex() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
            CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(jobId, vertex1, vertex2);
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
            CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
            Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)2L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
            long checkpoint1Id = (Long)it.next().getKey();
            long checkpoint2Id = (Long)it.next().getKey();
            PendingCheckpoint checkpoint1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpoint1Id);
            PendingCheckpoint checkpoint2 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
            Assert.assertNotNull((Object)checkpoint1);
            Assert.assertEquals((long)checkpoint1Id, (long)checkpoint1.getCheckpointId());
            Assert.assertEquals((Object)jobId, (Object)checkpoint1.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint1.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint1.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint1.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint1.isDiscarded());
            Assert.assertFalse((boolean)checkpoint1.areTasksFullyAcknowledged());
            Assert.assertNotNull((Object)checkpoint2);
            Assert.assertEquals((long)checkpoint2Id, (long)checkpoint2.getCheckpointId());
            Assert.assertEquals((Object)jobId, (Object)checkpoint2.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint2.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint2.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint2.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint2.isDiscarded());
            Assert.assertFalse((boolean)checkpoint2.areTasksFullyAcknowledged());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint1Id), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint1Id), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint2Id), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint2Id), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointAborted(Mockito.eq((long)checkpoint1Id), ((Long)Matchers.any(Long.class)).longValue());
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointAborted(Mockito.eq((long)checkpoint1Id), ((Long)Matchers.any(Long.class)).longValue());
            Assert.assertTrue((boolean)checkpoint1.isDiscarded());
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)1L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpointNew = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
            Assert.assertEquals((long)checkpoint2Id, (long)checkpointIdNew);
            Assert.assertNotNull((Object)checkpointNew);
            Assert.assertEquals((long)checkpointIdNew, (long)checkpointNew.getCheckpointId());
            Assert.assertEquals((Object)jobId, (Object)checkpointNew.getJobId());
            Assert.assertEquals((long)2L, (long)checkpointNew.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpointNew.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpointNew.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpointNew.isDiscarded());
            Assert.assertFalse((boolean)checkpointNew.areTasksFullyAcknowledged());
            Assert.assertNotEquals((long)checkpoint1.getCheckpointId(), (long)checkpointNew.getCheckpointId());
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID1, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID2, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue((boolean)checkpoint1.isDiscarded());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointAborted(Mockito.eq((long)checkpoint1Id), ((Long)Matchers.any(Long.class)).longValue());
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointAborted(Mockito.eq((long)checkpoint1Id), ((Long)Matchers.any(Long.class)).longValue());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
            CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(jobId, vertex1, vertex2);
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)1L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
            Assert.assertNotNull((Object)checkpoint);
            Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
            Assert.assertEquals((Object)jobId, (Object)checkpoint.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
            OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
            TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
            TaskStateSnapshot taskOperatorSubtaskStates2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
            OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            Mockito.when((Object)taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn((Object)subtaskState1);
            Mockito.when((Object)taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn((Object)subtaskState2);
            AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            ((TaskStateSnapshot)Mockito.verify((Object)taskOperatorSubtaskStates2, (VerificationMode)Mockito.never())).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.areTasksFullyAcknowledged());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.never())).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jobId, (Object)success.getJobId());
            Assert.assertEquals((long)checkpoint.getCheckpointId(), (long)success.getCheckpointID());
            Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
            CompletedCheckpoint successNew = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jobId, (Object)successNew.getJobId());
            Assert.assertEquals((long)checkpointIdNew, (long)successNew.getCheckpointID());
            Assert.assertTrue((boolean)successNew.getOperatorStates().isEmpty());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue());
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultipleConcurrentCheckpoints() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex ackVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID3);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasksToTrigger(new ExecutionVertex[]{triggerVertex1, triggerVertex2}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex1, ackVertex2, ackVertex3}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pending1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            long checkpointId1 = pending1.getCheckpointId();
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
            CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
            Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
            PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
            PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
            long checkpointId2 = pending2.getCheckpointId();
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)pending1.isDiscarded());
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId1), ((Long)Matchers.any(Long.class)).longValue());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)pending2.isDiscarded());
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue());
            List scs = checkpointCoordinator.getSuccessfulCheckpoints();
            CompletedCheckpoint sc1 = (CompletedCheckpoint)scs.get(0);
            Assert.assertEquals((long)checkpointId1, (long)sc1.getCheckpointID());
            Assert.assertEquals((Object)jobId, (Object)sc1.getJobId());
            Assert.assertTrue((boolean)sc1.getOperatorStates().isEmpty());
            CompletedCheckpoint sc2 = (CompletedCheckpoint)scs.get(1);
            Assert.assertEquals((long)checkpointId2, (long)sc2.getCheckpointID());
            Assert.assertEquals((Object)jobId, (Object)sc2.getJobId());
            Assert.assertTrue((boolean)sc2.getOperatorStates().isEmpty());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex ackVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID3);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasksToTrigger(new ExecutionVertex[]{triggerVertex1, triggerVertex2}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex1, ackVertex2, ackVertex3}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pending1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            long checkpointId1 = pending1.getCheckpointId();
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)ackVertex1.getJobvertexId());
            OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)ackVertex2.getJobvertexId());
            OperatorID opID3 = OperatorID.fromJobVertexID((JobVertexID)ackVertex3.getJobvertexId());
            TaskStateSnapshot taskOperatorSubtaskStates11 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates12 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates13 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState subtaskState11 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState12 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState13 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            taskOperatorSubtaskStates11.putSubtaskStateByOperatorID(opID1, subtaskState11);
            taskOperatorSubtaskStates12.putSubtaskStateByOperatorID(opID2, subtaskState12);
            taskOperatorSubtaskStates13.putSubtaskStateByOperatorID(opID3, subtaskState13);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
            CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
            Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
            PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
            PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
            long checkpointId2 = pending2.getCheckpointId();
            TaskStateSnapshot taskOperatorSubtaskStates21 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates22 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates23 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState subtaskState21 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState22 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState23 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            taskOperatorSubtaskStates21.putSubtaskStateByOperatorID(opID1, subtaskState21);
            taskOperatorSubtaskStates22.putSubtaskStateByOperatorID(opID2, subtaskState22);
            taskOperatorSubtaskStates23.putSubtaskStateByOperatorID(opID3, subtaskState23);
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
            Assert.assertTrue((boolean)pending1.isDiscarded());
            Assert.assertTrue((boolean)pending2.isDiscarded());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState11, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState12, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState21, (VerificationMode)Mockito.never())).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState22, (VerificationMode)Mockito.never())).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState23, (VerificationMode)Mockito.never())).discardState();
            List scs = checkpointCoordinator.getSuccessfulCheckpoints();
            CompletedCheckpoint success = (CompletedCheckpoint)scs.get(0);
            Assert.assertEquals((long)checkpointId2, (long)success.getCheckpointID());
            Assert.assertEquals((Object)jobId, (Object)success.getJobId());
            Assert.assertEquals((long)3L, (long)success.getOperatorStates().size());
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState13, (VerificationMode)Mockito.times((int)1))).discardState();
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState21, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState22, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState23, (VerificationMode)Mockito.times((int)1))).discardState();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointTimeoutIsolated() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasksToTrigger(new ExecutionVertex[]{triggerVertex}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex1, ackVertex2}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
            Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)ackVertex1.getJobvertexId());
            TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
            this.manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
            Assert.assertTrue((String)"Checkpoint was not canceled by the timeout", (boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).discardState();
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)0))).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testHandleMessagesForNonExistingCheckpoints() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasksToTrigger(new ExecutionVertex[]{triggerVertex}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex1, ackVertex2}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
            long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId), TASK_MANAGER_LOCATION_INFO);
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID triggerAttemptId = new ExecutionAttemptID();
        ExecutionVertex triggerVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptId);
        ExecutionAttemptID ackAttemptId1 = new ExecutionAttemptID();
        ExecutionVertex ackVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptId1);
        ExecutionAttemptID ackAttemptId2 = new ExecutionAttemptID();
        ExecutionVertex ackVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptId2);
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(chkConfig).setTasksToTrigger(new ExecutionVertex[]{triggerVertex}).setTasksToWaitFor(new ExecutionVertex[]{triggerVertex, ackVertex1, ackVertex2}).setTasksToCommitTo(new ExecutionVertex[0]).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        OperatorID opIDtrigger = OperatorID.fromJobVertexID((JobVertexID)triggerVertex.getJobvertexId());
        TaskStateSnapshot taskOperatorSubtaskStatesTrigger = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskStateTrigger = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskStateTrigger, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot differentJobSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot triggerSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)triggerSubtaskState, (VerificationMode)Mockito.never())).discardState();
        Mockito.reset((Object[])new OperatorSubtaskState[]{subtaskStateTrigger});
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)pendingCheckpoint.isDiscarded());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskStateTrigger, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot ackSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)ackSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new TaskStateSnapshot[]{differentJobSubtaskState});
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState2, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        this.testMaxConcurrentAttempts(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        this.testMaxConcurrentAttempts(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        this.testMaxConcurrentAttempts(5);
    }

    @Test
    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(jobId, vertex1, vertex2);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)savepointFuture.isDone());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint pending = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assert.assertNotNull((Object)pending);
        Assert.assertEquals((long)checkpointId, (long)pending.getCheckpointId());
        Assert.assertEquals((Object)jobId, (Object)pending.getJobId());
        Assert.assertEquals((long)2L, (long)pending.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)pending.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)pending.getOperatorStates().size());
        Assert.assertFalse((boolean)pending.isDiscarded());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        Assert.assertFalse((boolean)pending.canBeSubsumed());
        OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
        OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
        TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        TaskStateSnapshot taskOperatorSubtaskStates2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        Mockito.when((Object)taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn((Object)subtaskState1);
        Mockito.when((Object)taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn((Object)subtaskState2);
        AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)pending.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)1L, (long)pending.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse((boolean)pending.isDiscarded());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        Assert.assertFalse((boolean)savepointFuture.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
        Assert.assertFalse((boolean)pending.isDiscarded());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        Assert.assertFalse((boolean)savepointFuture.isDone());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        Assert.assertTrue((boolean)pending.isDiscarded());
        Assert.assertNotNull(savepointFuture.get());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId), ((Long)Matchers.any(Long.class)).longValue());
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId), ((Long)Matchers.any(Long.class)).longValue());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
        CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals((Object)jobId, (Object)success.getJobId());
        Assert.assertEquals((long)pending.getCheckpointId(), (long)success.getCheckpointID());
        Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
        savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)savepointFuture.isDone());
        long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        CompletedCheckpoint successNew = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals((Object)jobId, (Object)successNew.getJobId());
        Assert.assertEquals((long)checkpointIdNew, (long)successNew.getCheckpointID());
        Assert.assertTrue((boolean)successNew.getOperatorStates().isEmpty());
        Assert.assertNotNull(savepointFuture.get());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.never())).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.never())).discardState();
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue());
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), ((Long)Matchers.any(Long.class)).longValue());
        checkpointCoordinator.shutdown(JobStatus.FINISHED);
    }

    @Test
    public void testSavepointsAreNotSubsumed() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
        StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTasks(new ExecutionVertex[]{vertex1, vertex2}).setCheckpointIDCounter((CheckpointIDCounter)counter).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepointFuture1 = checkpointCoordinator.triggerSavepoint(savepointDir);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long savepointId1 = counter.getLast();
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        long checkpointId2 = counter.getLast();
        Assert.assertEquals((long)3L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse((boolean)((PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(savepointId1)).isDiscarded());
        Assert.assertFalse((boolean)savepointFuture1.isDone());
        CompletableFuture checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture3);
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        CompletableFuture savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long savepointId2 = counter.getLast();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)savepointFuture2);
        Assert.assertEquals((long)3L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse((boolean)((PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(savepointId1)).isDiscarded());
        Assert.assertFalse((boolean)savepointFuture1.isDone());
        Assert.assertNotNull(savepointFuture2.get());
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)3L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertNotNull(savepointFuture1.get());
    }

    private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            AtomicInteger numCalls = new AtomicInteger();
            Execution execution = triggerVertex.getCurrentExecutionAttempt();
            ((Execution)Mockito.doAnswer(invocation -> {
                numCalls.incrementAndGet();
                return null;
            }).when((Object)execution)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.doAnswer(invocation -> {
                numCalls.incrementAndGet();
                return null;
            }).when((Object)execution)).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(maxConcurrentAttempts).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(chkConfig).setTasksToTrigger(new ExecutionVertex[]{triggerVertex}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            checkpointCoordinator.startCheckpointScheduler();
            for (int i = 0; i < maxConcurrentAttempts; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Assert.assertEquals((long)maxConcurrentAttempts, (long)numCalls.get());
            ((Execution)Mockito.verify((Object)triggerVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)maxConcurrentAttempts))).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 1L), TASK_MANAGER_LOCATION_INFO);
            Collection<ScheduledFuture<?>> periodicScheduledTasks = this.manuallyTriggeredScheduledExecutor.getPeriodicScheduledTask();
            Assert.assertEquals((long)1L, (long)periodicScheduledTasks.size());
            ScheduledFuture<?> scheduledFuture = periodicScheduledTasks.iterator().next();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(maxConcurrentAttempts + 1), (long)numCalls.get());
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(maxConcurrentAttempts + 1), (long)numCalls.get());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttempsWithSubsumption() {
        try {
            int maxConcurrentAttempts = 2;
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(chkConfig).setTasksToTrigger(new ExecutionVertex[]{triggerVertex}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            checkpointCoordinator.startCheckpointScheduler();
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2);
            Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptID, 2L), TASK_MANAGER_LOCATION_INFO);
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2);
            Assert.assertEquals((long)2L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() {
        try {
            JobID jobId = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(commitAttemptID);
            AtomicReference<ExecutionState> currentState = new AtomicReference<ExecutionState>(ExecutionState.CREATED);
            Mockito.when((Object)triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(invocation -> (ExecutionState)currentState.get());
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(chkConfig).setTasksToTrigger(new ExecutionVertex[]{triggerVertex}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex}).setTasksToCommitTo(new ExecutionVertex[]{commitVertex}).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            checkpointCoordinator.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
            currentState.set(ExecutionState.RUNNING);
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertTrue((checkpointCoordinator.getNumberOfPendingCheckpoints() > 0 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testConcurrentSavepoints() throws Exception {
        JobID jobId = new JobID();
        int numSavepoints = 5;
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setCheckpointCoordinatorConfiguration(chkConfig).setTasks(new ExecutionVertex[]{vertex1}).setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        ArrayList<CompletableFuture> savepointFutures = new ArrayList<CompletableFuture>();
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < numSavepoints; ++i) {
            savepointFutures.add(checkpointCoordinator.triggerSavepoint(savepointDir));
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assert.assertFalse((boolean)savepointFuture.isDone());
        }
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long checkpointId = checkpointIDCounter.getLast();
        int i = 0;
        while (i < numSavepoints) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
            ++i;
            --checkpointId;
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assert.assertNotNull(savepointFuture.get());
        }
    }

    @Test
    public void testMinDelayBetweenSavepoints() throws Exception {
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMinPauseBetweenCheckpoints(100000000L).setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepoint0 = checkpointCoordinator.triggerSavepoint(savepointDir);
        Assert.assertFalse((String)"Did not trigger savepoint", (boolean)savepoint0.isDone());
        CompletableFuture savepoint1 = checkpointCoordinator.triggerSavepoint(savepointDir);
        Assert.assertFalse((String)"Did not trigger savepoint", (boolean)savepoint1.isDone());
    }

    @Test
    public void testExternalizedCheckpoints() throws Exception {
        try {
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setTimer(this.manuallyTriggeredScheduledExecutor).build();
            CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
            for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
                CheckpointProperties props = checkpoint.getProps();
                CheckpointProperties expected = CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
                Assert.assertEquals((Object)expected, (Object)props);
            }
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCreateKeyGroupPartitions() {
        this.testCreateKeyGroupPartitions(1, 1);
        this.testCreateKeyGroupPartitions(13, 1);
        this.testCreateKeyGroupPartitions(13, 2);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 1);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 13);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, Short.MAX_VALUE);
        Random r = new Random(1234L);
        for (int k = 0; k < 1000; ++k) {
            int maxParallelism = 1 + r.nextInt(32766);
            int parallelism = 1 + r.nextInt(maxParallelism);
            this.testCreateKeyGroupPartitions(maxParallelism, parallelism);
        }
    }

    private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
        List ranges = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism, (int)parallelism);
        for (int i = 0; i < maxParallelism; ++i) {
            KeyGroupRange range = (KeyGroupRange)ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)parallelism, (int)i));
            if (range.contains(i)) continue;
            Assert.fail((String)("Could not find expected key-group " + i + " in range " + range));
        }
    }

    @Test
    public void testPartitionableStateRepartitioning() {
        Random r = new Random(42L);
        for (int run = 0; run < 10000; ++run) {
            int oldParallelism = 1 + r.nextInt(9);
            int newParallelism = 1 + r.nextInt(9);
            int numNamedStates = 1 + r.nextInt(9);
            int maxPartitionsPerState = 1 + r.nextInt(9);
            this.doTestPartitionableStateRepartitioning(r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
        }
    }

    /*
     * WARNING - void declaration
     */
    private void doTestPartitionableStateRepartitioning(Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
        ArrayList<List<OperatorStreamStateHandle>> previousParallelOpInstanceStates = new ArrayList<List<OperatorStreamStateHandle>>(oldParallelism);
        for (int i = 0; i < oldParallelism; ++i) {
            void var11_16;
            Path fakePath = new Path("/fake-" + i);
            HashMap<String, OperatorStateHandle.StateMetaInfo> namedStatesToOffsets = new HashMap<String, OperatorStateHandle.StateMetaInfo>();
            int off = 0;
            boolean bl = false;
            while (var11_16 < numNamedStates - 1) {
                Object offs = new long[1 + r.nextInt(maxPartitionsPerState)];
                for (int o = 0; o < ((Object)offs).length; ++o) {
                    offs[o] = (long)off;
                    ++off;
                }
                OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ? OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
                namedStatesToOffsets.put("State-" + (int)var11_16, new OperatorStateHandle.StateMetaInfo((long[])offs, mode));
                ++var11_16;
            }
            if (numNamedStates % 2 == 0) {
                long[] lArray = new long[]{off + 1, off + 2, off + 3, off + 4};
                namedStatesToOffsets.put("State-" + (numNamedStates - 1), new OperatorStateHandle.StateMetaInfo(lArray, OperatorStateHandle.Mode.BROADCAST));
            }
            previousParallelOpInstanceStates.add(Collections.singletonList(new OperatorStreamStateHandle(namedStatesToOffsets, (StreamStateHandle)new FileStateHandle(fakePath, -1L))));
        }
        HashMap expected = new HashMap();
        int taskIndex = 0;
        int expectedTotalPartitions = 0;
        for (List list : previousParallelOpInstanceStates) {
            Assert.assertEquals((long)1L, (long)list.size());
            for (OperatorStateHandle psh : list) {
                Map offsMap = psh.getStateNameToPartitionOffsets();
                HashMap offsMapWithList = new HashMap(offsMap.size());
                for (Map.Entry e : offsMap.entrySet()) {
                    int replication;
                    long[] offs = ((OperatorStateHandle.StateMetaInfo)e.getValue()).getOffsets();
                    switch (((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode()) {
                        case UNION: {
                            replication = newParallelism;
                            break;
                        }
                        case BROADCAST: {
                            int extra = taskIndex < newParallelism % oldParallelism ? 1 : 0;
                            replication = newParallelism / oldParallelism + extra;
                            break;
                        }
                        case SPLIT_DISTRIBUTE: {
                            replication = 1;
                            break;
                        }
                        default: {
                            throw new RuntimeException("Unknown distribution mode " + ((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode());
                        }
                    }
                    if (replication <= 0) continue;
                    expectedTotalPartitions += replication * offs.length;
                    ArrayList<Long> offsList = new ArrayList<Long>(offs.length);
                    for (Object off : (Object)offs) {
                        for (int p = 0; p < replication; ++p) {
                            offsList.add((long)off);
                        }
                    }
                    offsMapWithList.put(e.getKey(), offsList);
                }
                if (!offsMapWithList.isEmpty()) {
                    expected.put(psh.getDelegateStateHandle(), offsMapWithList);
                }
                ++taskIndex;
            }
        }
        OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        List list = repartitioner.repartitionState(previousParallelOpInstanceStates, oldParallelism, newParallelism);
        HashMap actual = new HashMap();
        int minCount = Integer.MAX_VALUE;
        int maxCount = 0;
        int actualTotalPartitions = 0;
        for (int p = 0; p < newParallelism; ++p) {
            int partitionCount = 0;
            Collection pshc = (Collection)list.get(p);
            for (OperatorStateHandle sh : pshc) {
                for (Map.Entry namedState : sh.getStateNameToPartitionOffsets().entrySet()) {
                    long[] add;
                    ArrayList<Long> actualOffs;
                    HashMap stateToOffsets = (HashMap)actual.get(sh.getDelegateStateHandle());
                    if (stateToOffsets == null) {
                        stateToOffsets = new HashMap();
                        actual.put(sh.getDelegateStateHandle(), stateToOffsets);
                    }
                    if ((actualOffs = (ArrayList<Long>)stateToOffsets.get(namedState.getKey())) == null) {
                        actualOffs = new ArrayList<Long>();
                        stateToOffsets.put(namedState.getKey(), actualOffs);
                    }
                    for (long l : add = ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets()) {
                        actualOffs.add(l);
                    }
                    partitionCount += ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets().length;
                }
            }
            minCount = Math.min(minCount, partitionCount);
            maxCount = Math.max(maxCount, partitionCount);
            actualTotalPartitions += partitionCount;
        }
        for (Map v : actual.values()) {
            for (List l : v.values()) {
                Collections.sort(l);
            }
        }
        if (oldParallelism != newParallelism) {
            int maxLoadDiff = maxCount - minCount;
            Assert.assertTrue((String)("Difference in partition load is > 1 : " + maxLoadDiff), (maxLoadDiff <= 1 ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)expectedTotalPartitions, (long)actualTotalPartitions);
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(this.manuallyTriggeredScheduledExecutor).build();
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        checkpointCoordinator.setCheckpointStatsTracker(tracker);
        Mockito.when((Object)tracker.reportPendingCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointProperties)Matchers.any(CheckpointProperties.class))).thenReturn(Mockito.mock(PendingCheckpointStats.class));
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportPendingCheckpoint(Mockito.eq((long)1L), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointProperties)Mockito.eq((Object)CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));
    }

    @Test
    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)store).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        store.addCheckpoint(new CompletedCheckpoint(new JobID(), 0L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation()));
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        checkpointCoordinator.setCheckpointStatsTracker(tracker);
        Assert.assertTrue((boolean)checkpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportRestoredCheckpoint((RestoredCheckpointStats)Matchers.any(RestoredCheckpointStats.class));
    }

    @Test
    public void testSharedStateRegistrationOnRestore() throws Exception {
        JobID jobId = new JobID();
        JobVertexID jobVertexID1 = new JobVertexID();
        int parallelism1 = 2;
        int maxParallelism1 = 4;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(10);
        ArrayList createdSharedStateRegistries = new ArrayList(2);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(arrayExecutionVertices).setCompletedCheckpointStore(store).setTimer(this.manuallyTriggeredScheduledExecutor).setSharedStateRegistryFactory(deleteExecutor -> {
            SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);
            createdSharedStateRegistries.add(instance);
            return instance;
        }).build();
        int numCheckpoints = 3;
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        for (int i = 0; i < 3; ++i) {
            this.performIncrementalCheckpoint(jobId, checkpointCoordinator, jobVertex1, keyGroupPartitions1, i);
        }
        List completedCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
        Assert.assertEquals((long)3L, (long)completedCheckpoints.size());
        int sharedHandleCount = 0;
        ArrayList sharedHandlesByCheckpoint = new ArrayList(3);
        for (int i = 0; i < 3; ++i) {
            sharedHandlesByCheckpoint.add(new HashMap(2));
        }
        int cp = 0;
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                for (Object subtaskState : taskState.getStates()) {
                    for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                        ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)createdSharedStateRegistries.get(0));
                        IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle)keyedStateHandle;
                        ((Map)sharedHandlesByCheckpoint.get(cp)).putAll(incrementalKeyedStateHandle.getSharedState());
                        for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getSharedState().values()) {
                            Assert.assertTrue((!(streamStateHandle instanceof PlaceholderStreamStateHandle) ? 1 : 0) != 0);
                            ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.never())).discardState();
                            ++sharedHandleCount;
                        }
                        for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getPrivateState().values()) {
                            ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.never())).discardState();
                        }
                        ((StreamStateHandle)Mockito.verify((Object)incrementalKeyedStateHandle.getMetaStateHandle(), (VerificationMode)Mockito.never())).discardState();
                    }
                    ((OperatorSubtaskState)Mockito.verify((Object)subtaskState, (VerificationMode)Mockito.never())).discardState();
                }
            }
            ++cp;
        }
        Assert.assertEquals((long)10L, (long)sharedHandleCount);
        store.removeOldestCheckpoint();
        for (Map map : sharedHandlesByCheckpoint) {
            for (StreamStateHandle streamStateHandle : map.values()) {
                ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.never())).discardState();
            }
        }
        store.shutdown(JobStatus.SUSPENDED);
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(jobVertex1);
        Assert.assertTrue((boolean)checkpointCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
        cp = 0;
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState operatorState : completedCheckpoint.getOperatorStates().values()) {
                for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                    for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                        VerificationMode verificationMode = cp > 0 ? Mockito.times((int)1) : Mockito.never();
                        ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)verificationMode)).registerSharedStates((SharedStateRegistry)createdSharedStateRegistries.get(1));
                    }
                }
            }
            ++cp;
        }
        store.removeOldestCheckpoint();
        for (Map map : sharedHandlesByCheckpoint) {
            for (Map.Entry entry : map.entrySet()) {
                String key = ((StateHandleID)entry.getKey()).getKeyString();
                int belongToCP = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
                if (belongToCP == 0) {
                    ((StreamStateHandle)Mockito.verify(entry.getValue(), (VerificationMode)Mockito.times((int)1))).discardState();
                    continue;
                }
                ((StreamStateHandle)Mockito.verify(entry.getValue(), (VerificationMode)Mockito.never())).discardState();
            }
        }
        store.removeOldestCheckpoint();
        for (Map map : sharedHandlesByCheckpoint) {
            for (StreamStateHandle streamStateHandle : map.values()) {
                ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.times((int)1))).discardState();
            }
        }
    }

    @Test
    public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
        final Tuple2 invocationCounterAndException = Tuple2.of((Object)0, null);
        IOException expectedRootCause = new IOException("Custom-Exception");
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2);
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator(jobId, vertex1, vertex2, new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback(){

            public void failJob(Throwable cause) {
                invocationCounterAndException.f0 = (Integer)invocationCounterAndException.f0 + 1;
                invocationCounterAndException.f1 = cause;
            }

            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                throw new AssertionError((Object)"This method should not be called for the test.");
            }
        }));
        CompletableFuture savepointFuture = coordinator.triggerSynchronousSavepoint(false, "test-dir");
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        PendingCheckpoint syncSavepoint = this.declineSynchronousSavepoint(jobId, coordinator, attemptID1, expectedRootCause);
        Assert.assertTrue((boolean)syncSavepoint.isDiscarded());
        try {
            savepointFuture.get();
            Assert.fail((String)"Expected Exception not found.");
        }
        catch (ExecutionException e) {
            Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertTrue((boolean)(cause instanceof CheckpointException));
            Assert.assertEquals((Object)expectedRootCause.getMessage(), (Object)cause.getCause().getCause().getMessage());
        }
        Assert.assertEquals((long)1L, (long)((Integer)invocationCounterAndException.f0).intValue());
        Assert.assertTrue((invocationCounterAndException.f1 instanceof CheckpointException && ((Throwable)invocationCounterAndException.f1).getCause().getCause().getMessage().equals(expectedRootCause.getMessage()) ? 1 : 0) != 0);
        coordinator.shutdown(JobStatus.FAILING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerCheckpointAfterCancel() throws Exception {
        TestingCheckpointIDCounter idCounter = new TestingCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)idCounter).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        idCounter.setOwner(checkpointCoordinator);
        try {
            checkpointCoordinator.startCheckpointScheduler();
            CompletableFuture onCompletionPromise = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                onCompletionPromise.get();
                Assert.fail((String)"should not trigger periodic checkpoint after stop the coordinator.");
            }
            catch (ExecutionException e) {
                Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                Assert.assertTrue((boolean)checkpointExceptionOptional.isPresent());
                Assert.assertEquals((Object)CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, (Object)((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason());
            }
        }
        finally {
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSavepointScheduledInUnalignedMode() throws Exception {
        int maxConcurrentCheckpoints = 1;
        int checkpointRequestsToSend = 10;
        JobID jobId = new JobID();
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setUnalignedCheckpointsEnabled(true).setMaxConcurrentCheckpoints(maxConcurrentCheckpoints).build()).setJobId(jobId).setTimer(this.manuallyTriggeredScheduledExecutor).build();
        try {
            int activeRequests;
            ArrayList<CompletableFuture> checkpointFutures = new ArrayList<CompletableFuture>(checkpointRequestsToSend);
            coordinator.startCheckpointScheduler();
            for (activeRequests = 0; activeRequests < checkpointRequestsToSend; ++activeRequests) {
                checkpointFutures.add(coordinator.triggerCheckpoint(true));
            }
            Assert.assertEquals((long)(activeRequests - maxConcurrentCheckpoints), (long)coordinator.getNumQueuedRequests());
            CompletableFuture savepointFuture = coordinator.triggerSavepoint("/tmp");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(++activeRequests - maxConcurrentCheckpoints), (long)coordinator.getNumQueuedRequests());
            coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, new ExecutionAttemptID(), 1L, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "none");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.assertEquals((long)(--activeRequests - maxConcurrentCheckpoints), (long)coordinator.getNumQueuedRequests());
            Assert.assertEquals((long)1L, (long)checkpointFutures.stream().filter(Future::isDone).count());
            Assert.assertFalse((boolean)savepointFuture.isDone());
            Assert.assertEquals((long)maxConcurrentCheckpoints, (long)coordinator.getNumberOfPendingCheckpoints());
            CheckpointProperties props = ((PendingCheckpoint)coordinator.getPendingCheckpoints().values().iterator().next()).getProps();
            Assert.assertTrue((boolean)props.isSavepoint());
            Assert.assertFalse((boolean)props.forceCheckpoint());
        }
        finally {
            coordinator.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
        final JobID jobId = new JobID();
        final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1, (executionAttemptID, jid, checkpointId, timestamp, checkpointOptions) -> {});
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2, (executionAttemptID, jid, checkpointId, timestamp, checkpointOptions) -> {});
        OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
        OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
        final TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
        final TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
        OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
        OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
        final AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext coordinatorCheckpointContext = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((checkpointId, result) -> {
            coordCheckpointDone.set(true);
            result.complete(new byte[0]);
        }).setOperatorID(opID1).build();
        final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(new ExecutionVertex[]{vertex1, vertex2}).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext)).build();
        final AtomicReference checkpointIdRef = new AtomicReference();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new MasterTriggerRestoreHook<Integer>(){

            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
                Assert.assertTrue((String)"The coordinator checkpoint should have finished.", (boolean)coordCheckpointDone.get());
                checkpointIdRef.set(checkpointId);
                AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>(){

                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer obj) throws IOException {
                        return new byte[0];
                    }

                    public Integer deserialize(int version, byte[] serialized) throws IOException {
                        return 1;
                    }
                };
            }
        });
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)this.manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
        long checkpointId2 = (Long)checkpointIdRef.get();
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), ((Long)Matchers.any(Long.class)).longValue(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assert.assertEquals((Object)jobId, (Object)success.getJobId());
        Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
        checkpointCoordinator.shutdown(JobStatus.FINISHED);
    }

    @Test
    public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
        final JobID jobId = new JobID();
        final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID1, (executionAttemptID, jid, checkpointId, timestamp, checkpointOptions) -> {});
        ExecutionVertex vertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID2, (executionAttemptID, jid, checkpointId, timestamp, checkpointOptions) -> {});
        OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
        OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
        final TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
        final TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
        OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
        OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);
        final AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext coordinatorCheckpointContext = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((checkpointId, result) -> {
            coordCheckpointDone.set(true);
            result.complete(new byte[0]);
        }).setOperatorID(opID1).build();
        final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(new ExecutionVertex[]{vertex1, vertex2}).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext)).setStateBackEnd((StateBackend)new MemoryStateBackend(){
            private static final long serialVersionUID = 8134582566514272546L;

            public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
                return new MemoryBackendCheckpointStorage(jobId, null, null, 100){

                    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
                        return new NonPersistentMetadataCheckpointStorageLocation(1000){

                            public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
                                throw new IOException("Artificial Exception");
                            }
                        };
                    }
                };
            }
        }).build();
        final AtomicReference checkpointIdRef = new AtomicReference();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new MasterTriggerRestoreHook<Integer>(){

            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
                Assert.assertTrue((String)"The coordinator checkpoint should have finished.", (boolean)coordCheckpointDone.get());
                checkpointIdRef.set(checkpointId);
                AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>(){

                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer obj) throws IOException {
                        return new byte[0];
                    }

                    public Integer deserialize(int version, byte[] serialized) throws IOException {
                        return 1;
                    }
                };
            }
        });
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertTrue((boolean)checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID = new ExecutionAttemptID();
        ExecutionVertex executionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(attemptID);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((ignored, future) -> future.complete(new byte[0])).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(new ExecutionVertex[]{executionVertex}).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build();
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointId1 = (Long)Collections.max(checkpointCoordinator.getPendingCheckpoints().keySet());
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointId2 = (Long)Collections.max(checkpointCoordinator.getPendingCheckpoints().keySet());
            AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jobId, attemptID, checkpointId2, new CheckpointMetrics(), null);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
            Assert.assertEquals(Collections.singletonList(1L), context.getAbortedCheckpoints());
            Assert.assertEquals(Collections.singletonList(2L), context.getCompletedCheckpoints());
        }
        finally {
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
    }

    private CheckpointCoordinator getCheckpointCoordinator(JobID jobId, ExecutionVertex vertex1, ExecutionVertex vertex2) {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(new ExecutionVertex[]{vertex1, vertex2}).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer(this.manuallyTriggeredScheduledExecutor).build();
    }

    private CheckpointCoordinator getCheckpointCoordinator(JobID jobId, ExecutionVertex vertex1, ExecutionVertex vertex2, CheckpointFailureManager failureManager) {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jobId).setTasks(new ExecutionVertex[]{vertex1, vertex2}).setTimer(this.manuallyTriggeredScheduledExecutor).setFailureManager(failureManager).build();
    }

    private CheckpointCoordinator getCheckpointCoordinator() {
        ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex triggerVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID1);
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionVertex triggerVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(triggerAttemptID2, jobVertexID2, Collections.singletonList(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2)), 1, 1, ExecutionState.FINISHED, new ExecutionState[0]);
        ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
        ExecutionVertex ackVertex1 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID1);
        ExecutionVertex ackVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(ackAttemptID2);
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTasksToTrigger(new ExecutionVertex[]{triggerVertex1, triggerVertex2}).setTasksToWaitFor(new ExecutionVertex[]{ackVertex1, ackVertex2}).setTasksToCommitTo(new ExecutionVertex[0]).setTimer(this.manuallyTriggeredScheduledExecutor).build();
    }

    private CheckpointFailureManager getCheckpointFailureManager(final String errorMsg) {
        return new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback(){

            public void failJob(Throwable cause) {
                throw new RuntimeException(errorMsg);
            }

            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                throw new RuntimeException(errorMsg);
            }
        });
    }

    private PendingCheckpoint declineSynchronousSavepoint(JobID jobId, CheckpointCoordinator coordinator, ExecutionAttemptID attemptID, Throwable reason) {
        long checkpointId = (Long)coordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointId);
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED, reason)), TASK_MANAGER_LOCATION_INFO);
        return checkpoint;
    }

    private void performIncrementalCheckpoint(JobID jobId, CheckpointCoordinator checkpointCoordinator, ExecutionJobVertex jobVertex1, List<KeyGroupRange> keyGroupPartitions1, int cpSequenceNumber) throws Exception {
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals((long)1L, (long)checkpointCoordinator.getPendingCheckpoints().size());
        long checkpointId = (Long)Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index);
            HashMap<StateHandleID, Object> privateState = new HashMap<StateHandleID, Object>();
            privateState.put(new StateHandleID("private-1"), Mockito.spy((Object)new ByteStreamStateHandle("private-1", new byte[]{112})));
            HashMap<StateHandleID, Object> sharedState = new HashMap<StateHandleID, Object>();
            if (cpSequenceNumber > 0) {
                sharedState.put(new StateHandleID("shared-" + (cpSequenceNumber - 1)), Mockito.spy((Object)new PlaceholderStreamStateHandle()));
            }
            sharedState.put(new StateHandleID("shared-" + cpSequenceNumber), Mockito.spy((Object)new ByteStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{115})));
            IncrementalRemoteKeyedStateHandle managedState = (IncrementalRemoteKeyedStateHandle)Mockito.spy((Object)new IncrementalRemoteKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, checkpointId, sharedState, privateState, (StreamStateHandle)Mockito.spy((Object)new ByteStreamStateHandle("meta", new byte[]{109}))));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState)Mockito.spy((Object)new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.singleton((StateObject)managedState), StateObjectCollection.empty()));
            HashMap<OperatorID, OperatorSubtaskState> opStates = new HashMap<OperatorID, OperatorSubtaskState>();
            opStates.put(((OperatorIDPair)jobVertex1.getOperatorIDs().get(0)).getGeneratedOperatorID(), operatorSubtaskState);
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobId, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static class TestFailJobCallback
    implements CheckpointFailureManager.FailJobCallback {
        private int invokeCounter = 0;

        private TestFailJobCallback() {
        }

        public void failJob(Throwable cause) {
            ++this.invokeCounter;
        }

        public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID executionAttemptID) {
            ++this.invokeCounter;
        }

        public int getInvokeCounter() {
            return this.invokeCounter;
        }
    }

    private static class TestingCheckpointIDCounter
    extends StandaloneCheckpointIDCounter {
        private CheckpointCoordinator owner;

        private TestingCheckpointIDCounter() {
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull((Object)this.owner);
            this.owner.stopCheckpointScheduler();
            return super.getAndIncrement();
        }

        void setOwner(CheckpointCoordinator coordinator) {
            this.owner = (CheckpointCoordinator)Preconditions.checkNotNull((Object)coordinator);
        }
    }
}

