/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.stopwithsavepoint;

import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandler;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StopWithSavepointTerminationHandlerImplTest
extends TestLogger {
    private static final JobID JOB_ID = new JobID();
    private final TestingCheckpointScheduling checkpointScheduling = new TestingCheckpointScheduling(false);

    private StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver() {
        return this.createTestInstance(throwableCausingGlobalFailOver -> Assert.fail((String)"No global failover should be triggered."));
    }

    private StopWithSavepointTerminationHandlerImpl createTestInstance(Consumer<Throwable> handleGlobalFailureConsumer) {
        this.checkpointScheduling.stopCheckpointScheduler();
        TestingSchedulerNG scheduler = TestingSchedulerNG.newBuilder().setHandleGlobalFailureConsumer(handleGlobalFailureConsumer).build();
        return new StopWithSavepointTerminationHandlerImpl(JOB_ID, (SchedulerNG)scheduler, (CheckpointScheduling)this.checkpointScheduling, this.log);
    }

    @Test
    public void testHappyPath() throws ExecutionException, InterruptedException {
        StopWithSavepointTerminationHandlerImpl testInstance = this.createTestInstanceFailingOnGlobalFailOver();
        EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle();
        CompletedCheckpoint completedSavepoint = StopWithSavepointTerminationHandlerImplTest.createCompletedSavepoint(streamStateHandle);
        testInstance.handleSavepointCreation(completedSavepoint, null);
        testInstance.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED));
        Assert.assertThat(testInstance.getSavepointPath().get(), (Matcher)CoreMatchers.is((Object)completedSavepoint.getExternalPointer()));
        Assert.assertFalse((String)"The savepoint should not have been discarded.", (boolean)streamStateHandle.isDisposed());
        Assert.assertFalse((String)"Checkpoint scheduling should be disabled.", (boolean)this.checkpointScheduling.isEnabled());
    }

    @Test
    public void testSavepointCreationFailureWithoutExecutionTermination() {
        this.assertSavepointCreationFailure(testInstance -> {});
    }

    @Test
    public void testSavepointCreationFailureWithFailingExecutions() {
        this.assertSavepointCreationFailure(testInstance -> testInstance.handleExecutionsTermination(Collections.singletonList(ExecutionState.FAILED)));
    }

    @Test
    public void testSavepointCreationFailureWithFinishingExecutions() {
        this.assertSavepointCreationFailure(testInstance -> testInstance.handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED)));
    }

    public void assertSavepointCreationFailure(Consumer<StopWithSavepointTerminationHandler> handleExecutionsTermination) {
        StopWithSavepointTerminationHandlerImpl testInstance = this.createTestInstanceFailingOnGlobalFailOver();
        String expectedErrorMessage = "Expected exception during savepoint creation.";
        testInstance.handleSavepointCreation(null, (Throwable)new Exception("Expected exception during savepoint creation."));
        handleExecutionsTermination.accept((StopWithSavepointTerminationHandler)testInstance);
        try {
            testInstance.getSavepointPath().get();
            Assert.fail((String)"An ExecutionException is expected.");
        }
        catch (Throwable e) {
            Optional actualException = ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Expected exception during savepoint creation.");
            Assert.assertTrue((String)"An exception with the expected error message should have been thrown.", (boolean)actualException.isPresent());
        }
        Assert.assertTrue((String)"Checkpoint scheduling should be enabled.", (boolean)this.checkpointScheduling.isEnabled());
    }

    @Test
    public void testFailedTerminationHandling() throws ExecutionException, InterruptedException {
        CompletableFuture globalFailOverTriggered = new CompletableFuture();
        StopWithSavepointTerminationHandlerImpl testInstance = this.createTestInstance(globalFailOverTriggered::complete);
        ExecutionState expectedNonFinishedState = ExecutionState.FAILED;
        String expectedErrorMessage = String.format("Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: %s. A global fail-over is triggered to recover the job %s.", expectedNonFinishedState, JOB_ID);
        EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle();
        CompletedCheckpoint completedSavepoint = StopWithSavepointTerminationHandlerImplTest.createCompletedSavepoint(streamStateHandle);
        testInstance.handleSavepointCreation(completedSavepoint, null);
        testInstance.handleExecutionsTermination(Collections.singletonList(expectedNonFinishedState));
        try {
            testInstance.getSavepointPath().get();
            Assert.fail((String)"An ExecutionException is expected.");
        }
        catch (Throwable e) {
            Optional actualFlinkException = ExceptionUtils.findThrowable((Throwable)e, FlinkException.class);
            Assert.assertTrue((String)"A FlinkException should have been thrown.", (boolean)actualFlinkException.isPresent());
            Assert.assertThat((Object)((FlinkException)actualFlinkException.get()).getMessage(), (Matcher)CoreMatchers.is((Object)expectedErrorMessage));
        }
        Assert.assertTrue((String)"Global fail-over was not triggered.", (boolean)globalFailOverTriggered.isDone());
        Assert.assertThat((Object)((Throwable)globalFailOverTriggered.get()).getMessage(), (Matcher)CoreMatchers.is((Object)expectedErrorMessage));
        Assert.assertFalse((String)"Savepoint should not be discarded.", (boolean)streamStateHandle.isDisposed());
        Assert.assertFalse((String)"Checkpoint scheduling should not be enabled in case of failure.", (boolean)this.checkpointScheduling.isEnabled());
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testInvalidExecutionTerminationCall() {
        this.createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED));
    }

    @Test(expected=NullPointerException.class)
    public void testSavepointCreationParameterBothNull() {
        this.createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(null, null);
    }

    @Test(expected=IllegalArgumentException.class)
    public void testSavepointCreationParameterBothSet() {
        this.createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(StopWithSavepointTerminationHandlerImplTest.createCompletedSavepoint(new EmptyStreamStateHandle()), (Throwable)new Exception("No exception should be passed if a savepoint is available."));
    }

    @Test(expected=NullPointerException.class)
    public void testExecutionTerminationWithNull() {
        this.createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(null);
    }

    private static CompletedCheckpoint createCompletedSavepoint(StreamStateHandle streamStateHandle) {
        return new CompletedCheckpoint(JOB_ID, 0L, 0L, 0L, new HashMap(), null, CheckpointProperties.forSavepoint((boolean)true), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"));
    }
}

