/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Test;
import org.slf4j.Logger;

public class ExecutionGraphPartitionReleaseTest
extends TestLogger {
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private static final TestingComponentMainThreadExecutor mainThreadExecutor = new TestingComponentMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService));

    @Test
    public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
        JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex operatorVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex sinkVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        operatorVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        sinkVertex.connectNewDataSetAsInput(operatorVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        ArrayDeque releasedPartitions = new ArrayDeque();
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionIds -> releasedPartitions.add(partitionIds.iterator().next()));
        ExecutionGraph executionGraph = this.createExecutionGraph(partitionTracker, sourceVertex, operatorVertex, sinkVertex);
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution sourceExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sourceVertex, executionGraph);
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.empty());
        }));
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution sourceExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sourceVertex, executionGraph);
            Execution operatorExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operatorVertex, executionGraph);
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.hasSize((int)1));
            MatcherAssert.assertThat(releasedPartitions.remove(), (Matcher)IsEqual.equalTo((Object)new ResultPartitionID((IntermediateResultPartitionID)sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(), sourceExecution.getAttemptId())));
        }));
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operatorExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operatorVertex, executionGraph);
            Execution sinkExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sinkVertex, executionGraph);
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.hasSize((int)1));
            MatcherAssert.assertThat(releasedPartitions.remove(), (Matcher)IsEqual.equalTo((Object)new ResultPartitionID((IntermediateResultPartitionID)operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(), operatorExecution.getAttemptId())));
        }));
    }

    @Test
    public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
        JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
        JobVertex operator1Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
        JobVertex operator2Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
        JobVertex operator3Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
        operator1Vertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        operator2Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        operator3Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        ArrayDeque releasedPartitions = new ArrayDeque();
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionIds -> releasedPartitions.add(partitionIds.iterator().next()));
        ExecutionGraph executionGraph = this.createExecutionGraph(partitionTracker, sourceVertex, operator1Vertex, operator2Vertex, operator3Vertex);
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution sourceExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sourceVertex, executionGraph);
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.empty());
        }));
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator1Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator1Vertex, executionGraph);
            for (IntermediateResultPartitionID partitionId : operator1Execution.getVertex().getProducedPartitions().keySet()) {
                executionGraph.scheduleOrUpdateConsumers(new ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
            }
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator1Execution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.empty());
        }));
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator2Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator2Vertex, executionGraph);
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator2Execution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.empty());
        }));
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator2Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator2Vertex, executionGraph);
            operator2Execution.getVertex().resetForNewExecution(0L, 1L);
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.empty());
        }));
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator3Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator3Vertex, executionGraph);
            executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator3Execution.getAttemptId(), ExecutionState.FINISHED));
            MatcherAssert.assertThat((Object)releasedPartitions, (Matcher)Matchers.empty());
        }));
    }

    private static Execution getCurrentExecution(JobVertex jobVertex, ExecutionGraph executionGraph) {
        return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
    }

    private ExecutionGraph createExecutionGraph(JobMasterPartitionTracker partitionTracker, JobVertex ... vertices) throws Exception {
        ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(null, (JobGraph)new JobGraph(new JobID(), "test job", vertices), (Configuration)new Configuration(), (ScheduledExecutorService)scheduledExecutorService, (Executor)mainThreadExecutor.getMainThreadExecutor(), (SlotProvider)new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())), (ClassLoader)ExecutionGraphPartitionReleaseTest.class.getClassLoader(), (CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory(), (Time)AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (MetricGroup)new UnregisteredMetricsGroup(), (BlobWriter)VoidBlobWriter.getInstance(), (Time)AkkaUtils.getDefaultTimeout(), (Logger)this.log, (ShuffleMaster)NettyShuffleMaster.INSTANCE, (JobMasterPartitionTracker)partitionTracker);
        executionGraph.start(mainThreadExecutor.getMainThreadExecutor());
        mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((ExecutionGraph)executionGraph).scheduleForExecution()));
        return executionGraph;
    }
}

