/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class OperatorCoordinatorSchedulerTest
extends TestLogger {
    private final JobVertexID testVertexId = new JobVertexID();
    private final OperatorID testOperatorId = new OperatorID();
    private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
    private DefaultScheduler createdScheduler;

    @After
    public void shutdownScheduler() throws Exception {
        if (this.createdScheduler != null) {
            this.createdScheduler.suspend((Throwable)new Exception("shutdown"));
        }
    }

    @Test
    public void testCoordinatorStartedWhenSchedulerStarts() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        Assert.assertTrue((boolean)coordinator.isStarted());
    }

    @Test
    public void testCoordinatorDisposedWhenSchedulerStops() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        scheduler.suspend((Throwable)new Exception("test suspend"));
        Assert.assertTrue((boolean)coordinator.isClosed());
    }

    @Test
    public void testFailureToStartPropagatesExceptions() throws Exception {
        TestingOperatorCoordinator.Provider failingCoordinatorProvider = new TestingOperatorCoordinator.Provider(this.testOperatorId, CoordinatorThatFailsInStart::new);
        DefaultScheduler scheduler = this.createScheduler(failingCoordinatorProvider);
        try {
            scheduler.startScheduling();
            Assert.fail((String)"expected an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testFailureToStartClosesCoordinator() throws Exception {
        TestingOperatorCoordinator.Provider failingCoordinatorProvider = new TestingOperatorCoordinator.Provider(this.testOperatorId, CoordinatorThatFailsInStart::new);
        DefaultScheduler scheduler = this.createScheduler(failingCoordinatorProvider);
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        try {
            scheduler.startScheduling();
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((boolean)coordinator.isClosed());
    }

    @Test
    public void deployingTaskFailureNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failTask(scheduler, 1);
        Assert.assertEquals((long)1L, (long)coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.contains((Object[])new Integer[]{1}));
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new Integer[]{0})));
    }

    @Test
    public void runningTaskFailureNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failTask(scheduler, 1);
        Assert.assertEquals((long)1L, (long)coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.contains((Object[])new Integer[]{1}));
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new Integer[]{0})));
    }

    @Test
    public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithAllRestartOnFailureAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failTask(scheduler, 1);
        Assert.assertEquals((long)2L, (long)coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 1}));
    }

    @Test
    public void taskRepeatedFailureNotifyCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 0);
        this.failAndRestartTask(scheduler, 0);
        Assert.assertEquals((long)2L, (long)coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.contains((Object[])new Integer[]{0, 0}));
    }

    @Test
    public void taskExceptionWhenTasksNotRunning() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        OperatorCoordinator.Context context = this.getCoordinator(scheduler).getContext();
        CompletableFuture result = context.sendEvent((OperatorEvent)new TestOperatorEvent(), 0);
        this.executor.triggerAll();
        Assert.assertThat((Object)result, (Matcher)FlinkMatchers.futureFailedWith(TaskNotRunningException.class));
    }

    @Test
    public void taskTaskManagerFailuresAreReportedBack() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks(new FailingTaskExecutorOperatorEventGateway());
        OperatorCoordinator.Context context = this.getCoordinator(scheduler).getContext();
        CompletableFuture result = context.sendEvent((OperatorEvent)new TestOperatorEvent(), 0);
        this.executor.triggerAll();
        Assert.assertThat((Object)result, (Matcher)FlinkMatchers.futureFailedWith(TestException.class));
    }

    @Ignore
    @Test
    public void deployingTaskCancellationNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.cancelTask(scheduler, 1);
        Assert.assertEquals((long)1L, (long)coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.contains((Object[])new Integer[]{1}));
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new Integer[]{0})));
    }

    @Ignore
    @Test
    public void runningTaskCancellationNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.cancelTask(scheduler, 0);
        Assert.assertEquals((long)1L, (long)coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.contains((Object[])new Integer[]{0}));
        Assert.assertThat(coordinator.getFailedTasks(), (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new Integer[]{1})));
    }

    @Test
    public void testTakeCheckpoint() throws Exception {
        byte[] checkpointData = new byte[656];
        new Random().nextBytes(checkpointData);
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        coordinator.getLastTriggeredCheckpoint().complete(checkpointData);
        this.acknowledgeCurrentCheckpoint(scheduler);
        OperatorState state = (OperatorState)checkpointFuture.get().getOperatorStates().get(this.testOperatorId);
        Assert.assertArrayEquals((byte[])checkpointData, (byte[])OperatorCoordinatorSchedulerTest.getStateHandleContents((StreamStateHandle)state.getCoordinatorState()));
    }

    @Test
    public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {
        TestingOperatorCoordinator.Provider failingCoordinatorProvider = new TestingOperatorCoordinator.Provider(this.testOperatorId, CoordinatorThatFailsCheckpointing::new);
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks(failingCoordinatorProvider);
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        Assert.assertThat(checkpointFuture, OperatorCoordinatorSchedulerTest.futureWillCompleteWithTestException());
    }

    @Test
    public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        CompletableFuture<byte[]> coordinatorStateFuture = coordinator.getLastTriggeredCheckpoint();
        coordinatorStateFuture.completeExceptionally(new TestException());
        Assert.assertThat(checkpointFuture, OperatorCoordinatorSchedulerTest.futureWillCompleteWithTestException());
    }

    @Test
    public void testSavepointRestoresCoordinator() throws Exception {
        byte[] testCoordinatorState = new byte[123];
        new Random().nextBytes(testCoordinatorState);
        DefaultScheduler scheduler = this.createSchedulerWithRestoredSavepoint(testCoordinatorState);
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        byte[] restoredState = coordinator.getLastRestoredCheckpointState();
        Assert.assertArrayEquals((byte[])testCoordinatorState, (byte[])restoredState);
    }

    @Test
    public void testGlobalFailureResetsToCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        byte[] coordinatorState = new byte[]{7, 11, 3, 5};
        this.takeCompleteCheckpoint(scheduler, coordinator, coordinatorState);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assert.assertArrayEquals((String)"coordinator should have a restored checkpoint", (byte[])coordinatorState, (byte[])coordinator.getLastRestoredCheckpointState());
    }

    @Test
    public void testGlobalFailureBeforeCheckpointResetsToEmptyState() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assert.assertSame((String)"coordinator should have null restored state", (Object)TestingOperatorCoordinator.NULL_RESTORE_VALUE, (Object)coordinator.getLastRestoredCheckpointState());
        Assert.assertEquals((long)-1L, (long)coordinator.getLastRestoredCheckpointId());
    }

    @Test
    public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assert.assertThat(coordinator.getRestoredTasks(), (Matcher)Matchers.empty());
    }

    @Test
    public void testLocalFailoverResetsTask() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        long checkpointId = this.takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);
        this.failAndRestartTask(scheduler, 1);
        Assert.assertEquals((long)1L, (long)coordinator.getRestoredTasks().size());
        TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0);
        Assert.assertEquals((long)1L, (long)restoredTask.subtaskIndex);
        Assert.assertEquals((long)checkpointId, (long)restoredTask.checkpointId);
    }

    @Test
    public void testLocalFailoverBeforeCheckpointResetsTask() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 1);
        Assert.assertEquals((long)1L, (long)coordinator.getRestoredTasks().size());
        TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0);
        Assert.assertEquals((long)1L, (long)restoredTask.subtaskIndex);
        Assert.assertEquals((long)-1L, (long)restoredTask.checkpointId);
    }

    @Test
    public void testLocalFailoverDoesNotResetToCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.takeCompleteCheckpoint(scheduler, coordinator, new byte[]{37, 11, 83, 4});
        this.failAndRestartTask(scheduler, 0);
        Assert.assertNull((String)"coordinator should not have a restored checkpoint", (Object)coordinator.getLastRestoredCheckpointState());
    }

    @Test
    public void testConfirmCheckpointComplete() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        long checkpointId = this.takeCompleteCheckpoint(scheduler, coordinator, new byte[]{37, 11, 83, 4});
        Assert.assertEquals((String)"coordinator should be notified of completed checkpoint", (long)checkpointId, (long)coordinator.getLastCheckpointComplete());
    }

    @Test
    public void testBatchGlobalFailureResetsToEmptyState() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assert.assertSame((String)"coordinator should have null restored state", (Object)TestingOperatorCoordinator.NULL_RESTORE_VALUE, (Object)coordinator.getLastRestoredCheckpointState());
        Assert.assertEquals((long)-1L, (long)coordinator.getLastRestoredCheckpointId());
    }

    @Test
    public void testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assert.assertThat(coordinator.getRestoredTasks(), (Matcher)Matchers.empty());
    }

    @Test
    public void testBatchLocalFailoverResetsTask() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 1);
        Assert.assertEquals((long)1L, (long)coordinator.getRestoredTasks().size());
        TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0);
        Assert.assertEquals((long)1L, (long)restoredTask.subtaskIndex);
        Assert.assertEquals((long)-1L, (long)restoredTask.checkpointId);
    }

    @Test
    public void testBatchLocalFailoverDoesNotResetToCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 0);
        Assert.assertNull((String)"coordinator should not have a restored checkpoint", (Object)coordinator.getLastRestoredCheckpointState());
    }

    @Test
    public void testDeliveringClientRequestToRequestHandler() throws Exception {
        TestingCoordinationRequestHandler.Provider provider = new TestingCoordinationRequestHandler.Provider(this.testOperatorId);
        DefaultScheduler scheduler = this.createScheduler(provider);
        String payload = "testing payload";
        TestingCoordinationRequestHandler.Request<String> request = new TestingCoordinationRequestHandler.Request<String>("testing payload");
        TestingCoordinationRequestHandler.Response response = (TestingCoordinationRequestHandler.Response)scheduler.deliverCoordinationRequestToCoordinator(this.testOperatorId, request).get();
        Assert.assertEquals((Object)"testing payload", response.getPayload());
    }

    @Test
    public void testDeliveringClientRequestToNonRequestHandler() throws Exception {
        TestingOperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(this.testOperatorId);
        DefaultScheduler scheduler = this.createScheduler(provider);
        String payload = "testing payload";
        TestingCoordinationRequestHandler.Request<String> request = new TestingCoordinationRequestHandler.Request<String>("testing payload");
        CommonTestUtils.assertThrows((String)"cannot handle client event", FlinkException.class, () -> scheduler.deliverCoordinationRequestToCoordinator(this.testOperatorId, (CoordinationRequest)request));
    }

    @Test
    public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception {
        TestingOperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(this.testOperatorId);
        DefaultScheduler scheduler = this.createScheduler(provider);
        String payload = "testing payload";
        TestingCoordinationRequestHandler.Request<String> request = new TestingCoordinationRequestHandler.Request<String>("testing payload");
        CommonTestUtils.assertThrows((String)"does not exist", FlinkException.class, () -> scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), (CoordinationRequest)request));
    }

    private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
        return this.setupTestJobAndScheduler(provider);
    }

    private DefaultScheduler createAndStartScheduler() throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId));
        scheduler.startScheduling();
        this.executor.triggerAll();
        Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, 0));
        return scheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
        return this.createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(this.testOperatorId));
    }

    private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks() throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, null, true);
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks() throws Exception {
        Consumer<JobGraph> noCheckpoints = jobGraph -> jobGraph.setSnapshotSettings(null);
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, noCheckpoints, false);
        Assert.assertNull((Object)scheduler.getExecutionGraph().getCheckpointCoordinator());
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(provider);
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), gateway, null, false);
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinatorState) throws Exception {
        byte[] savepointMetadata = OperatorCoordinatorSchedulerTest.serializeAsCheckpointMetadata(this.testOperatorId, coordinatorState);
        String savepointPointer = "testingSavepointPointer";
        TestingCheckpointStorageCoordinatorView storage = new TestingCheckpointStorageCoordinatorView();
        storage.registerSavepoint("testingSavepointPointer", savepointMetadata);
        Consumer<JobGraph> savepointConfigurer = jobGraph -> {
            SchedulerTestingUtils.enableCheckpointing(jobGraph, storage.asStateBackend());
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"testingSavepointPointer"));
        };
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, savepointConfigurer, false);
        scheduler.startScheduling();
        return scheduler;
    }

    private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
        return this.setupTestJobAndScheduler(provider, null, null, false);
    }

    private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider, @Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway, @Nullable Consumer<JobGraph> jobGraphPreProcessing, boolean restartAllOnFailover) throws Exception {
        SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder;
        OperatorIDPair opIds = OperatorIDPair.of((OperatorID)new OperatorID(), (OperatorID)provider.getOperatorId());
        JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", this.testVertexId, Collections.singletonList(opIds));
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.addOperatorCoordinator(new SerializedValue((Object)provider));
        vertex.setParallelism(2);
        JobGraph jobGraph = new JobGraph("test job with OperatorCoordinator", new JobVertex[]{vertex});
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        if (jobGraphPreProcessing != null) {
            jobGraphPreProcessing.accept(jobGraph);
        }
        SchedulerTestingUtils.DefaultSchedulerBuilder defaultSchedulerBuilder = schedulerBuilder = taskExecutorOperatorEventGateway == null ? SchedulerTestingUtils.createSchedulerBuilder(jobGraph, this.executor) : SchedulerTestingUtils.createSchedulerBuilder(jobGraph, this.executor, taskExecutorOperatorEventGateway);
        if (restartAllOnFailover) {
            schedulerBuilder.setFailoverStrategyFactory((FailoverStrategy.Factory)new RestartAllFailoverStrategy.Factory());
        }
        DefaultScheduler scheduler = schedulerBuilder.build();
        ComponentMainThreadExecutorServiceAdapter mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(this.executor, Thread.currentThread());
        scheduler.setMainThreadExecutor((ComponentMainThreadExecutor)mainThreadExecutor);
        this.createdScheduler = scheduler;
        return scheduler;
    }

    private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
        scheduler.startScheduling();
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, 0));
    }

    private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {
        ExecutionJobVertex vertexWithCoordinator = OperatorCoordinatorSchedulerTest.getJobVertex(scheduler, this.testVertexId);
        Assert.assertNotNull((String)"vertex for coordinator not found", (Object)vertexWithCoordinator);
        Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator.getOperatorCoordinators().stream().filter(holder -> holder.operatorId().equals((Object)this.testOperatorId)).findFirst();
        Assert.assertTrue((String)"vertex does not contain coordinator", (boolean)coordinatorOptional.isPresent());
        OperatorCoordinator coordinator = coordinatorOptional.get().coordinator();
        Assert.assertThat((Object)coordinator, (Matcher)Matchers.instanceOf(TestingOperatorCoordinator.class));
        return (TestingOperatorCoordinator)coordinator;
    }

    private void failTask(DefaultScheduler scheduler, int subtask) {
        SchedulerTestingUtils.failExecution(scheduler, this.testVertexId, subtask);
        this.executor.triggerAll();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask));
    }

    private void failAndRedeployTask(DefaultScheduler scheduler, int subtask) {
        this.failTask(scheduler, subtask);
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        this.executor.triggerAll();
        Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask));
    }

    private void failAndRestartTask(DefaultScheduler scheduler, int subtask) {
        this.failAndRedeployTask(scheduler, subtask);
        SchedulerTestingUtils.setExecutionToRunning(scheduler, this.testVertexId, subtask);
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask));
    }

    private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason) {
        scheduler.handleGlobalFailure(reason);
        SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        this.executor.triggerAll();
        SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
        this.executor.triggerAll();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, 0));
    }

    private void cancelTask(DefaultScheduler scheduler, int subtask) {
        SchedulerTestingUtils.canceledExecution(scheduler, this.testVertexId, subtask);
        this.executor.triggerAll();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask));
    }

    private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
        CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        while (!coordinator.hasTriggeredCheckpoint() && !future.isDone()) {
            this.executor.triggerAll();
            Thread.sleep(1L);
        }
        return future;
    }

    private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
        this.executor.triggerAll();
        SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);
        this.executor.triggerAll();
    }

    private long takeCompleteCheckpoint(DefaultScheduler scheduler, TestingOperatorCoordinator testingOperatorCoordinator, byte[] coordinatorState) throws Exception {
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(coordinatorState);
        this.acknowledgeCurrentCheckpoint(scheduler);
        long checkpointId = checkpointFuture.get().getCheckpointID();
        while (!testingOperatorCoordinator.hasCompleteCheckpoint()) {
            this.executor.triggerAll();
            Thread.sleep(1L);
        }
        return checkpointId;
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
        ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
        return scheduler.getExecutionVertex(id).getJobVertex();
    }

    private static OperatorState createOperatorState(OperatorID id, byte[] coordinatorState) {
        OperatorState state = new OperatorState(id, 10, 16384);
        state.setCoordinatorState(new ByteStreamStateHandle("name", coordinatorState));
        return state;
    }

    private static byte[] serializeAsCheckpointMetadata(OperatorID id, byte[] coordinatorState) throws IOException {
        OperatorState state = OperatorCoordinatorSchedulerTest.createOperatorState(id, coordinatorState);
        CheckpointMetadata metadata = new CheckpointMetadata(1337L, Collections.singletonList(state), Collections.emptyList());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Checkpoints.storeCheckpointMetadata((CheckpointMetadata)metadata, (OutputStream)out);
        return out.toByteArray();
    }

    private static <T> Matcher<CompletableFuture<T>> futureWillCompleteWithTestException() {
        return FlinkMatchers.futureWillCompleteExceptionally(e -> ExceptionUtils.findThrowableSerializedAware((Throwable)e, TestException.class, (ClassLoader)OperatorCoordinatorSchedulerTest.class.getClassLoader()).isPresent(), (Duration)Duration.ofSeconds(10L), (String)"A TestException in the cause chain");
    }

    private static byte[] getStateHandleContents(StreamStateHandle stateHandle) {
        if (stateHandle instanceof ByteStreamStateHandle) {
            return ((ByteStreamStateHandle)stateHandle).getData();
        }
        Assert.fail((String)"other state handles not implemented");
        return null;
    }

    private static final class FailingTaskExecutorOperatorEventGateway
    implements TaskExecutorOperatorEventGateway {
        private FailingTaskExecutorOperatorEventGateway() {
        }

        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {
            return FutureUtils.completedExceptionally((Throwable)new TestException());
        }
    }

    private static final class CoordinatorThatFailsCheckpointing
    extends TestingOperatorCoordinator {
        public CoordinatorThatFailsCheckpointing(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override
        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
            throw new Error(new TestException());
        }
    }

    private static final class CoordinatorThatFailsInStart
    extends TestingOperatorCoordinator {
        public CoordinatorThatFailsInStart(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override
        public void start() throws Exception {
            throw new Exception("test failure");
        }
    }

    private static final class TestException
    extends Exception {
        private TestException() {
        }
    }

    private static final class TestOperatorEvent
    implements OperatorEvent {
        private TestOperatorEvent() {
        }
    }
}

