/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;

public class ExecutionPartitionLifecycleTest
extends TestLogger {
    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource();
    private Execution execution;
    private ResultPartitionDeploymentDescriptor descriptor;
    private ResourceID taskExecutorResourceId;
    private JobID jobId;

    @Test
    public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
        this.testPartitionReleaseOnStateTransitionsAfterRunning(Execution::cancel, Execution::markFinished);
    }

    @Test
    public void testPartitionReleaseOnCancelWhileFinished() throws Exception {
        this.testPartitionReleaseOnStateTransitionsAfterRunning(Execution::markFinished, Execution::cancel);
    }

    @Test
    public void testPartitionReleaseOnSuspendWhileFinished() throws Exception {
        this.testPartitionReleaseOnStateTransitionsAfterRunning(Execution::markFinished, Execution::suspend);
    }

    private void testPartitionReleaseOnStateTransitionsAfterRunning(Consumer<Execution> stateTransition1, Consumer<Execution> stateTransition2) throws Exception {
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        CompletableFuture releasePartitionsCallFuture = new CompletableFuture();
        taskManagerGateway.setReleasePartitionsConsumer((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of((Object)jobID, (Object)partitionIds)));
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        this.setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpJobMasterPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
        stateTransition1.accept(this.execution);
        Assert.assertFalse((boolean)releasePartitionsCallFuture.isDone());
        stateTransition2.accept(this.execution);
        Assert.assertTrue((boolean)releasePartitionsCallFuture.isDone());
        Tuple2 releasePartitionsCall = (Tuple2)releasePartitionsCallFuture.get();
        Assert.assertEquals((Object)this.jobId, (Object)releasePartitionsCall.f0);
        Assert.assertThat((Object)releasePartitionsCall.f1, (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{this.descriptor.getShuffleDescriptor().getResultPartitionID()}));
        Assert.assertEquals((long)1L, (long)testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)this.descriptor.getShuffleDescriptor(), (Object)testingShuffleMaster.externallyReleasedPartitions.poll());
    }

    @Test
    public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
        this.testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE);
    }

    @Test
    public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> {
            execution.cancel();
            execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0L, 0L, 0L, 0L), false);
        }, PartitionReleaseResult.STOP_TRACKING);
    }

    @Test
    public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> {
            execution.cancel();
            execution.completeCancelling();
        }, PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
    }

    @Test
    public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> execution.markFailed((Throwable)new Exception("Test exception"), Collections.emptyMap(), new IOMetrics(0L, 0L, 0L, 0L)), PartitionReleaseResult.STOP_TRACKING);
    }

    @Test
    public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> execution.markFailed((Throwable)new Exception("Test exception")), PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
    }

    private void testPartitionTrackingForStateTransition(Consumer<Execution> stateTransition, PartitionReleaseResult partitionReleaseResult) throws Exception {
        CompletableFuture partitionStartTrackingFuture = new CompletableFuture();
        CompletableFuture partitionStopTrackingFuture = new CompletableFuture();
        CompletableFuture partitionStopTrackingAndReleaseFuture = new CompletableFuture();
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> partitionStartTrackingFuture.complete(Tuple2.of((Object)resourceID, (Object)resultPartitionDeploymentDescriptor)));
        partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
        this.setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), (ShuffleMaster<?>)NettyShuffleMaster.INSTANCE);
        Tuple2 startTrackingCall = (Tuple2)partitionStartTrackingFuture.get();
        Assert.assertThat((Object)startTrackingCall.f0, (Matcher)Matchers.equalTo((Object)this.taskExecutorResourceId));
        Assert.assertThat((Object)startTrackingCall.f1, (Matcher)Matchers.equalTo((Object)this.descriptor));
        stateTransition.accept(this.execution);
        switch (partitionReleaseResult) {
            case NONE: {
                Assert.assertFalse((boolean)partitionStopTrackingFuture.isDone());
                Assert.assertFalse((boolean)partitionStopTrackingAndReleaseFuture.isDone());
                break;
            }
            case STOP_TRACKING: {
                Assert.assertTrue((boolean)partitionStopTrackingFuture.isDone());
                Assert.assertFalse((boolean)partitionStopTrackingAndReleaseFuture.isDone());
                Collection stopTrackingCall = (Collection)partitionStopTrackingFuture.get();
                Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), (Object)stopTrackingCall);
                break;
            }
            case STOP_TRACKING_AND_RELEASE: {
                Assert.assertFalse((boolean)partitionStopTrackingFuture.isDone());
                Assert.assertTrue((boolean)partitionStopTrackingAndReleaseFuture.isDone());
                Collection stopTrackingAndReleaseCall = (Collection)partitionStopTrackingAndReleaseFuture.get();
                Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), (Object)stopTrackingAndReleaseCall);
            }
        }
    }

    private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, JobMasterPartitionTracker partitionTracker, final TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException {
        JobVertex producerVertex = this.createNoOpJobVertex();
        JobVertex consumerVertex = this.createNoOpJobVertex();
        consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SlotProvider slotProvider = new SlotProvider(){

            public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, Time allocationTimeout) {
                return CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().setTaskManagerLocation(taskManagerLocation).setTaskManagerGateway(taskManagerGateway).setSlotOwner(new SingleSlotTestingSlotOwner()).createTestingLogicalSlot());
            }

            public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
            }
        };
        ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(null, (JobGraph)new JobGraph(new JobID(), "test job", new JobVertex[]{producerVertex, consumerVertex}), (Configuration)new Configuration(), (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (SlotProvider)slotProvider, (ClassLoader)ExecutionPartitionLifecycleTest.class.getClassLoader(), (CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory(), (Time)Time.seconds((long)10L), (RestartStrategy)new NoRestartStrategy(), (MetricGroup)new UnregisteredMetricsGroup(), (BlobWriter)VoidBlobWriter.getInstance(), (Time)Time.seconds((long)10L), (Logger)this.log, shuffleMaster, (JobMasterPartitionTracker)partitionTracker);
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        this.execution = executionVertex.getCurrentExecutionAttempt();
        this.execution.allocateResourcesForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        this.execution.deploy();
        this.execution.switchToRunning();
        IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex.getProducedDataSets()[0].getPartitions()[0].getPartitionId();
        this.descriptor = (ResultPartitionDeploymentDescriptor)this.execution.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
        this.taskExecutorResourceId = taskManagerLocation.getResourceID();
        this.jobId = executionGraph.getJobID();
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }

    private static class TestingShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<ShuffleDescriptor>(4);

        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(final PartitionDescriptor partitionDescriptor, final ProducerDescriptor producerDescriptor) {
            return CompletableFuture.completedFuture(new ShuffleDescriptor(){

                public ResultPartitionID getResultPartitionID() {
                    return new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
                }

                public Optional<ResourceID> storesLocalResourcesOn() {
                    return Optional.of(producerDescriptor.getProducerLocation());
                }
            });
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor);
        }
    }

    private static final class SingleSlotTestingSlotOwner
    implements SlotOwner {
        final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture();

        private SingleSlotTestingSlotOwner() {
        }

        public void returnLogicalSlot(LogicalSlot logicalSlot) {
            this.returnedSlot.complete(logicalSlot);
        }
    }

    private static enum PartitionReleaseResult {
        NONE,
        STOP_TRACKING,
        STOP_TRACKING_AND_RELEASE;

    }
}

