/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.SerializableObject;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.verification.VerificationMode;

public class CheckpointStateRestoreTest {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";

    @Test
    public void testSetState() {
        try {
            KeyGroupRange keyGroupRange = KeyGroupRange.of((int)0, (int)0);
            List<SerializableObject> testStates = Collections.singletonList(new SerializableObject());
            KeyGroupsStateHandle serializedKeyGroupStates = CheckpointCoordinatorTestingUtils.generateKeyGroupState(keyGroupRange, testStates);
            JobID jid = new JobID();
            JobVertexID statefulId = new JobVertexID();
            JobVertexID statelessId = new JobVertexID();
            Execution statefulExec1 = this.mockExecution();
            Execution statefulExec2 = this.mockExecution();
            Execution statefulExec3 = this.mockExecution();
            Execution statelessExec1 = this.mockExecution();
            Execution statelessExec2 = this.mockExecution();
            ExecutionVertex stateful1 = this.mockExecutionVertex(statefulExec1, statefulId, 0, 3);
            ExecutionVertex stateful2 = this.mockExecutionVertex(statefulExec2, statefulId, 1, 3);
            ExecutionVertex stateful3 = this.mockExecutionVertex(statefulExec3, statefulId, 2, 3);
            ExecutionVertex stateless1 = this.mockExecutionVertex(statelessExec1, statelessId, 0, 2);
            ExecutionVertex stateless2 = this.mockExecutionVertex(statelessExec2, statelessId, 1, 2);
            ExecutionJobVertex stateful = this.mockExecutionJobVertex(statefulId, new ExecutionVertex[]{stateful1, stateful2, stateful3});
            ExecutionJobVertex stateless = this.mockExecutionJobVertex(statelessId, new ExecutionVertex[]{stateless1, stateless2});
            HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
            tasks.add(stateful);
            tasks.add(stateless);
            ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
            CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setJobId(jid).setTasksToTrigger(new ExecutionVertex[]{stateful1, stateful2, stateful3, stateless1, stateless2}).setTasksToWaitFor(new ExecutionVertex[]{stateful1, stateful2, stateful3, stateless1, stateless2}).setTasksToCommitTo(new ExecutionVertex[0]).setTimer(manuallyTriggeredScheduledExecutor).build();
            coord.triggerCheckpoint(false);
            manuallyTriggeredScheduledExecutor.triggerAll();
            PendingCheckpoint pending = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pending.getCheckpointId();
            final TaskStateSnapshot subtaskStates = new TaskStateSnapshot();
            subtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)statefulId), new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.singleton((StateObject)serializedKeyGroupStates), StateObjectCollection.empty()));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStates), TASK_MANAGER_LOCATION_INFO);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
            BaseMatcher<JobManagerTaskRestore> matcher = new BaseMatcher<JobManagerTaskRestore>(){

                public boolean matches(Object o) {
                    if (o instanceof JobManagerTaskRestore) {
                        JobManagerTaskRestore taskRestore = (JobManagerTaskRestore)o;
                        return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStates);
                    }
                    return false;
                }

                public void describeTo(Description description) {
                    description.appendValue((Object)subtaskStates);
                }
            };
            ((Execution)Mockito.verify((Object)statefulExec1, (VerificationMode)Mockito.times((int)1))).setInitialState((JobManagerTaskRestore)MockitoHamcrest.argThat((Matcher)matcher));
            ((Execution)Mockito.verify((Object)statefulExec2, (VerificationMode)Mockito.times((int)1))).setInitialState((JobManagerTaskRestore)MockitoHamcrest.argThat((Matcher)matcher));
            ((Execution)Mockito.verify((Object)statefulExec3, (VerificationMode)Mockito.times((int)1))).setInitialState((JobManagerTaskRestore)MockitoHamcrest.argThat((Matcher)matcher));
            ((Execution)Mockito.verify((Object)statelessExec1, (VerificationMode)Mockito.times((int)0))).setInitialState((JobManagerTaskRestore)Mockito.any());
            ((Execution)Mockito.verify((Object)statelessExec2, (VerificationMode)Mockito.times((int)0))).setInitialState((JobManagerTaskRestore)Mockito.any());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNoCheckpointAvailable() {
        try {
            CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build();
            boolean restored = coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
            Assert.assertFalse((boolean)restored);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNonRestoredState() throws Exception {
        JobVertexID jobVertexId1 = new JobVertexID();
        JobVertexID jobVertexId2 = new JobVertexID();
        OperatorID operatorId1 = OperatorID.fromJobVertexID((JobVertexID)jobVertexId1);
        ExecutionVertex vertex11 = this.mockExecutionVertex(this.mockExecution(), jobVertexId1, 0, 3);
        ExecutionVertex vertex12 = this.mockExecutionVertex(this.mockExecution(), jobVertexId1, 1, 3);
        ExecutionVertex vertex13 = this.mockExecutionVertex(this.mockExecution(), jobVertexId1, 2, 3);
        ExecutionVertex vertex21 = this.mockExecutionVertex(this.mockExecution(), jobVertexId2, 0, 2);
        ExecutionVertex vertex22 = this.mockExecutionVertex(this.mockExecution(), jobVertexId2, 1, 2);
        ExecutionJobVertex jobVertex1 = this.mockExecutionJobVertex(jobVertexId1, new ExecutionVertex[]{vertex11, vertex12, vertex13});
        ExecutionJobVertex jobVertex2 = this.mockExecutionJobVertex(jobVertexId2, new ExecutionVertex[]{vertex21, vertex22});
        HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
        tasks.add(jobVertex1);
        tasks.add(jobVertex2);
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build();
        HashMap<OperatorID, OperatorState> checkpointTaskStates = new HashMap<OperatorID, OperatorState>();
        OperatorState taskState = new OperatorState(operatorId1, 3, 3);
        taskState.putState(0, new OperatorSubtaskState());
        taskState.putState(1, new OperatorSubtaskState());
        taskState.putState(2, new OperatorSubtaskState());
        checkpointTaskStates.put(operatorId1, taskState);
        CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0L, 1L, 2L, new HashMap(checkpointTaskStates), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
        coord.getCheckpointStore().addCheckpoint(checkpoint);
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, false));
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedStateToAll(tasks, true));
        JobVertexID newJobVertexID = new JobVertexID();
        OperatorID newOperatorID = OperatorID.fromJobVertexID((JobVertexID)newJobVertexID);
        OperatorState taskState2 = new OperatorState(newOperatorID, 1, 1);
        taskState2.putState(0, new OperatorSubtaskState());
        checkpointTaskStates.put(newOperatorID, taskState2);
        checkpoint = new CompletedCheckpoint(new JobID(), 1L, 2L, 3L, new HashMap(checkpointTaskStates), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation());
        coord.getCheckpointStore().addCheckpoint(checkpoint);
        boolean restored = coord.restoreLatestCheckpointedStateToAll(tasks, true);
        Assert.assertTrue((boolean)restored);
        try {
            coord.restoreLatestCheckpointedStateToAll(tasks, false);
            Assert.fail((String)"Did not throw the expected Exception.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    private Execution mockExecution() {
        return this.mockExecution(ExecutionState.RUNNING);
    }

    private Execution mockExecution(ExecutionState state) {
        Execution mock = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)mock.getAttemptId()).thenReturn((Object)new ExecutionAttemptID());
        Mockito.when((Object)mock.getState()).thenReturn((Object)state);
        return mock;
    }

    private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) {
        ExecutionVertex mock = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)mock.getJobvertexId()).thenReturn((Object)vertexId);
        Mockito.when((Object)mock.getParallelSubtaskIndex()).thenReturn((Object)subtask);
        Mockito.when((Object)mock.getCurrentExecutionAttempt()).thenReturn((Object)execution);
        Mockito.when((Object)mock.getTotalNumberOfParallelSubtasks()).thenReturn((Object)parallelism);
        Mockito.when((Object)mock.getMaxParallelism()).thenReturn((Object)parallelism);
        return mock;
    }

    private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
        ExecutionJobVertex vertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)vertex.getParallelism()).thenReturn((Object)vertices.length);
        Mockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)vertices.length);
        Mockito.when((Object)vertex.getJobVertexId()).thenReturn((Object)id);
        Mockito.when((Object)vertex.getTaskVertices()).thenReturn((Object)vertices);
        Mockito.when((Object)vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)OperatorID.fromJobVertexID((JobVertexID)id))));
        for (ExecutionVertex v : vertices) {
            Mockito.when((Object)v.getJobVertex()).thenReturn((Object)vertex);
        }
        return vertex;
    }
}

