/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.Canceling;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.ExecutingTest;
import org.apache.flink.runtime.scheduler.adaptive.Failing;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.MockStateWithExecutionGraphContext;
import org.apache.flink.runtime.scheduler.adaptive.Restarting;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint;
import org.apache.flink.runtime.scheduler.adaptive.TestingOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StopWithSavepointTest
extends TestLogger {
    private static final String SAVEPOINT_PATH = "test://savepoint/path";

    @Test
    public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectFinished(WaitingForResourcesTest.assertNonNull());
            mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED);
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            Assert.assertThat(sws.getOperationFuture().get(), (Matcher)CoreMatchers.is((Object)SAVEPOINT_PATH));
        }
    }

    @Test
    public void testJobFailed() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockExecutionGraph);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> {
                Assert.assertThat((Object)failingArguments.getExecutionGraph().getState(), (Matcher)CoreMatchers.is((Object)JobStatus.FAILED));
                Assert.assertThat((Object)failingArguments.getFailureCause(), (Matcher)FlinkMatchers.containsCause(FlinkException.class));
            });
            mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
            ctx.triggerExecutors();
            Assert.assertThat((Object)sws.getOperationFuture().isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testJobFailedAndSavepointOperationFails() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> {
                Assert.assertThat((Object)failingArguments.getExecutionGraph().getState(), (Matcher)CoreMatchers.is((Object)JobStatus.FAILED));
                Assert.assertThat((Object)failingArguments.getFailureCause(), (Matcher)FlinkMatchers.containsCause(FlinkException.class));
            });
            mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED);
            savepointFuture.completeExceptionally(new RuntimeException());
            ctx.triggerExecutors();
            Assert.assertThat((Object)sws.getOperationFuture().isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testJobFinishedBeforeSavepointFuture() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectFinished(WaitingForResourcesTest.assertNonNull());
            mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED);
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            Assert.assertThat(sws.getOperationFuture().get(), (Matcher)CoreMatchers.is((Object)SAVEPOINT_PATH));
        }
    }

    @Test
    public void testTransitionToCancellingOnCancel() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectCancelling(WaitingForResourcesTest.assertNonNull());
            sws.cancel();
        }
    }

    @Test
    public void testTransitionToFinishedOnSuspend() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx);
            ctx.setExpectFinished(archivedExecutionGraph -> Assert.assertThat((Object)archivedExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.SUSPENDED)));
            sws.suspend((Throwable)new RuntimeException());
        }
    }

    @Test
    public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            sws.handleGlobalFailure((Throwable)new RuntimeException());
        }
    }

    @Test
    public void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> Assert.assertThat((Object)failingArguments.getFailureCause(), (Matcher)FlinkMatchers.containsCause(RuntimeException.class)));
            sws.handleGlobalFailure((Throwable)new RuntimeException());
        }
    }

    @Test
    public void testFailingOnUpdateTaskExecutionStateWithNoRestart() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, executionGraph);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(FailureResult::canNotRestart);
            ctx.setExpectFailing(failingArguments -> Assert.assertThat((Object)failingArguments.getFailureCause(), (Matcher)FlinkMatchers.containsCause(RuntimeException.class)));
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            executionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            Assert.assertThat((Object)sws.updateTaskExecutionState(taskExecutionStateTransition), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, executionGraph);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            executionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            Assert.assertThat((Object)sws.updateTaskExecutionState(taskExecutionStateTransition), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testExceptionalOperationFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion() throws Exception {
        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
        StopWithSavepoint sws = this.createStopWithSavepoint(ctx);
        ctx.setStopWithSavepoint(sws);
        sws.onLeave(Canceling.class);
        ctx.close();
        Assert.assertThat((Object)sws.getOperationFuture().isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testExceptionalSavepointCompletionLeadsToExceptionalOperationFutureCompletion() throws Exception {
        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
        MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
        CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
        StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
        ctx.setStopWithSavepoint(sws);
        ctx.setExpectExecuting(WaitingForResourcesTest.assertNonNull());
        savepointFuture.completeExceptionally(new RuntimeException("Test error"));
        ctx.close();
        Assert.assertThat((Object)sws.getOperationFuture().isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception {
        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
        MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
        CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
        StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
        ctx.setStopWithSavepoint(sws);
        ctx.setExpectExecuting(executingArguments -> Assert.assertThat((Object)executingArguments.getExecutionGraph().getState(), (Matcher)CoreMatchers.is((Object)JobStatus.RUNNING)));
        savepointFuture.completeExceptionally(new RuntimeException("Test error"));
        ctx.close();
        Assert.assertThat((Object)sws.getOperationFuture().isCompletedExceptionally(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setHowToHandleFailure(failure -> FailureResult.canRestart((Throwable)failure, (Duration)Duration.ZERO));
            ctx.setExpectRestarting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.complete(SAVEPOINT_PATH);
            ctx.triggerExecutors();
            RuntimeException exception = new RuntimeException();
            TestingAccessExecution execution = TestingAccessExecution.newBuilder().withExecutionState(ExecutionState.FAILED).withErrorInfo(new ErrorInfo((Throwable)exception, System.currentTimeMillis())).build();
            executionGraph.registerExecution(execution);
            TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
            Assert.assertThat((Object)sws.updateTaskExecutionState(taskExecutionStateTransition), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testEnsureCheckpointSchedulerIsStartedAgain() throws Exception {
        try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();){
            MockCheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
            Assert.assertThat((Object)mockStopWithSavepointOperations.isCheckpointSchedulerStarted(), (Matcher)CoreMatchers.is((Object)false));
            CompletableFuture<String> savepointFuture = new CompletableFuture<String>();
            StopWithSavepoint sws = this.createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
            ctx.setStopWithSavepoint(sws);
            ctx.setExpectExecuting(WaitingForResourcesTest.assertNonNull());
            savepointFuture.completeExceptionally(new RuntimeException("Test error"));
            ctx.triggerExecutors();
            Assert.assertThat((Object)mockStopWithSavepointOperations.isCheckpointSchedulerStarted(), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    private StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx) {
        return this.createStopWithSavepoint(ctx, new MockCheckpointScheduling(), new StateTrackingMockExecutionGraph(), new CompletableFuture<String>());
    }

    private StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, ExecutionGraph executionGraph, CompletableFuture<String> savepointFuture) {
        return this.createStopWithSavepoint(ctx, new MockCheckpointScheduling(), executionGraph, savepointFuture);
    }

    private StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, ExecutionGraph executionGraph) {
        return this.createStopWithSavepoint(ctx, executionGraph, new CompletableFuture<String>());
    }

    private StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, CheckpointScheduling checkpointScheduling, CompletableFuture<String> savepointFuture) {
        return this.createStopWithSavepoint(ctx, checkpointScheduling, new StateTrackingMockExecutionGraph(), savepointFuture);
    }

    private StopWithSavepoint createStopWithSavepoint(MockStopWithSavepointContext ctx, CheckpointScheduling checkpointScheduling, ExecutionGraph executionGraph, CompletableFuture<String> savepointFuture) {
        ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, this.log, (Executor)ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor());
        TestingOperatorCoordinatorHandler operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
        executionGraph.transitionToRunning();
        return new StopWithSavepoint((StopWithSavepoint.Context)ctx, executionGraph, executionGraphHandler, (OperatorCoordinatorHandler)operatorCoordinatorHandler, checkpointScheduling, this.log, ClassLoader.getSystemClassLoader(), savepointFuture, new ArrayList());
    }

    private static class MockCheckpointScheduling
    implements CheckpointScheduling {
        private boolean checkpointSchedulerStarted = false;

        private MockCheckpointScheduling() {
        }

        public void startCheckpointScheduler() {
            this.checkpointSchedulerStarted = true;
        }

        public void stopCheckpointScheduler() {
            this.checkpointSchedulerStarted = false;
        }

        boolean isCheckpointSchedulerStarted() {
            return this.checkpointSchedulerStarted;
        }
    }

    private static class MockStopWithSavepointContext
    extends MockStateWithExecutionGraphContext
    implements StopWithSavepoint.Context {
        private Function<Throwable, FailureResult> howToHandleFailure;
        private final StateValidator<ExecutingTest.FailingArguments> failingStateValidator = new StateValidator("failing");
        private final StateValidator<ExecutingTest.RestartingArguments> restartingStateValidator = new StateValidator("restarting");
        private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator = new StateValidator("cancelling");
        private final StateValidator<ExecutingTest.CancellingArguments> executingStateTransition = new StateValidator("executing");
        private StopWithSavepoint state;

        private MockStopWithSavepointContext() {
        }

        public void setStopWithSavepoint(StopWithSavepoint sws) {
            this.state = sws;
        }

        public void setExpectFailing(Consumer<ExecutingTest.FailingArguments> asserter) {
            this.failingStateValidator.expectInput(asserter);
        }

        public void setExpectRestarting(Consumer<ExecutingTest.RestartingArguments> asserter) {
            this.restartingStateValidator.expectInput(asserter);
        }

        public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
            this.cancellingStateValidator.expectInput(asserter);
        }

        public void setExpectExecuting(Consumer<ExecutingTest.CancellingArguments> asserter) {
            this.executingStateTransition.expectInput(asserter);
        }

        public void setHowToHandleFailure(Function<Throwable, FailureResult> function) {
            this.howToHandleFailure = function;
        }

        public FailureResult howToHandleFailure(Throwable failure) {
            return this.howToHandleFailure.apply(failure);
        }

        private void simulateTransitionToState(Class<? extends State> target) {
            Preconditions.checkNotNull((Object)this.state, (String)"StopWithSavepoint state must be set via setStopWithSavepoint() to call onLeave() on leaving the state");
            this.state.onLeave(target);
        }

        public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
            this.simulateTransitionToState(Canceling.class);
            this.cancellingStateValidator.validateInput(new ExecutingTest.CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        public void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, List<ExceptionHistoryEntry> failureCollection) {
            this.simulateTransitionToState(Restarting.class);
            this.restartingStateValidator.validateInput(new ExecutingTest.RestartingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, backoffTime));
            this.hadStateTransition = true;
        }

        public void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause, List<ExceptionHistoryEntry> failureCollection) {
            this.simulateTransitionToState(Failing.class);
            this.failingStateValidator.validateInput(new ExecutingTest.FailingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler, failureCause));
            this.hadStateTransition = true;
        }

        public void goToExecuting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
            this.simulateTransitionToState(Executing.class);
            this.executingStateTransition.validateInput(new ExecutingTest.CancellingArguments(executionGraph, executionGraphHandler, operatorCoordinatorHandler));
            this.hadStateTransition = true;
        }

        @Override
        public boolean isState(State expectedState) {
            return !this.hadStateTransition;
        }

        public ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration delay) {
            if (!delay.isZero()) {
                throw new UnsupportedOperationException("Currently only immediate execution is supported");
            }
            return this.getMainThreadExecutor().schedule(() -> {
                if (this.isState(state)) {
                    runnable.run();
                }
            }, delay.toMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public void close() throws Exception {
            super.close();
            this.failingStateValidator.close();
            this.restartingStateValidator.close();
            this.cancellingStateValidator.close();
            this.executingStateTransition.close();
        }
    }
}

