/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTrackerTest;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class ExecutionGraphCheckpointCoordinatorTest
extends TestLogger {
    @Test
    public void testShutdownCheckpointCoordinatorOnFailure() throws Exception {
        CompletableFuture counterShutdownFuture = new CompletableFuture();
        TestingCheckpointIDCounter counter = new TestingCheckpointIDCounter(counterShutdownFuture);
        CompletableFuture storeShutdownFuture = new CompletableFuture();
        TestingCompletedCheckpointStore store = new TestingCompletedCheckpointStore(storeShutdownFuture);
        ExecutionGraph graph = this.createExecutionGraphAndEnableCheckpointing(counter, store);
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assert.assertThat((Object)checkpointCoordinator, (Matcher)Matchers.notNullValue());
        Assert.assertThat((Object)checkpointCoordinator.isShutdown(), (Matcher)Matchers.is((Object)false));
        graph.failGlobal((Throwable)new Exception("Test Exception"));
        Assert.assertThat((Object)checkpointCoordinator.isShutdown(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat(counterShutdownFuture.get(), (Matcher)Matchers.is((Object)JobStatus.FAILED));
        Assert.assertThat(storeShutdownFuture.get(), (Matcher)Matchers.is((Object)JobStatus.FAILED));
    }

    @Test
    public void testShutdownCheckpointCoordinatorOnSuspend() throws Exception {
        CompletableFuture counterShutdownFuture = new CompletableFuture();
        TestingCheckpointIDCounter counter = new TestingCheckpointIDCounter(counterShutdownFuture);
        CompletableFuture storeShutdownFuture = new CompletableFuture();
        TestingCompletedCheckpointStore store = new TestingCompletedCheckpointStore(storeShutdownFuture);
        ExecutionGraph graph = this.createExecutionGraphAndEnableCheckpointing(counter, store);
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assert.assertThat((Object)checkpointCoordinator, (Matcher)Matchers.notNullValue());
        Assert.assertThat((Object)checkpointCoordinator.isShutdown(), (Matcher)Matchers.is((Object)false));
        graph.suspend((Throwable)new Exception("Test Exception"));
        Assert.assertThat((Object)checkpointCoordinator.isShutdown(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat(counterShutdownFuture.get(), (Matcher)Matchers.is((Object)JobStatus.SUSPENDED));
        Assert.assertThat(storeShutdownFuture.get(), (Matcher)Matchers.is((Object)JobStatus.SUSPENDED));
    }

    @Test
    public void testShutdownCheckpointCoordinatorOnFinished() throws Exception {
        CompletableFuture counterShutdownFuture = new CompletableFuture();
        TestingCheckpointIDCounter counter = new TestingCheckpointIDCounter(counterShutdownFuture);
        CompletableFuture storeShutdownFuture = new CompletableFuture();
        TestingCompletedCheckpointStore store = new TestingCompletedCheckpointStore(storeShutdownFuture);
        ExecutionGraph graph = this.createExecutionGraphAndEnableCheckpointing(counter, store);
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        Assert.assertThat((Object)checkpointCoordinator, (Matcher)Matchers.notNullValue());
        Assert.assertThat((Object)checkpointCoordinator.isShutdown(), (Matcher)Matchers.is((Object)false));
        graph.scheduleForExecution();
        for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) {
            Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
            graph.updateState(new TaskExecutionState(graph.getJobID(), currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED));
        }
        Assert.assertThat(graph.getTerminationFuture().get(), (Matcher)Matchers.is((Object)JobStatus.FINISHED));
        Assert.assertThat((Object)checkpointCoordinator.isShutdown(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat(counterShutdownFuture.get(), (Matcher)Matchers.is((Object)JobStatus.FINISHED));
        Assert.assertThat(storeShutdownFuture.get(), (Matcher)Matchers.is((Object)JobStatus.FINISHED));
    }

    private ExecutionGraph createExecutionGraphAndEnableCheckpointing(CheckpointIDCounter counter, CompletedCheckpointStore store) throws Exception {
        Time timeout = Time.days((long)1L);
        JobVertex jobVertex = new JobVertex("MockVertex");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobVertex[]{jobVertex})).setRpcTimeout(timeout).setAllocationTimeout(timeout).build();
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(100L, 100L, 100L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, false, 0);
        executionGraph.enableCheckpointing(chkConfig, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), counter, store, (StateBackend)new MemoryStateBackend(), CheckpointStatsTrackerTest.createTestTracker());
        return executionGraph;
    }

    private static final class TestingCompletedCheckpointStore
    implements CompletedCheckpointStore {
        private final CompletableFuture<JobStatus> shutdownStatus;

        private TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
            this.shutdownStatus = shutdownStatus;
        }

        public void recover() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void addCheckpoint(CompletedCheckpoint checkpoint) {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void shutdown(JobStatus jobStatus) {
            this.shutdownStatus.complete(jobStatus);
        }

        public List<CompletedCheckpoint> getAllCheckpoints() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public int getNumberOfRetainedCheckpoints() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public boolean requiresExternalizedCheckpoints() {
            throw new UnsupportedOperationException("Not implemented.");
        }
    }

    private static final class TestingCheckpointIDCounter
    implements CheckpointIDCounter {
        private final CompletableFuture<JobStatus> shutdownStatus;

        private TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
            this.shutdownStatus = shutdownStatus;
        }

        public void start() {
        }

        public void shutdown(JobStatus jobStatus) {
            this.shutdownStatus.complete(jobStatus);
        }

        public long getAndIncrement() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public long get() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void setCount(long newId) {
            throw new UnsupportedOperationException("Not implemented.");
        }
    }
}

