/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Test;

public class DefaultExecutionGraphDeploymentTest
extends TestLogger {
    protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
    protected PermanentBlobService blobCache = null;

    protected void checkJobOffloaded(DefaultExecutionGraph eg) throws Exception {
        TestCase.assertTrue((boolean)eg.getJobInformationOrBlobKey().isLeft());
    }

    protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
        TestCase.assertTrue((boolean)eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft());
    }

    @Test
    public void testBuildDeploymentDescriptor() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertexID jid3 = new JobVertexID();
        JobVertexID jid4 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        JobVertex v3 = new JobVertex("v3", jid3);
        JobVertex v4 = new JobVertex("v4", jid4);
        v1.setParallelism(10);
        v2.setParallelism(10);
        v3.setParallelism(10);
        v4.setParallelism(10);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        v3.setInvokableClass(BatchTask.class);
        v4.setInvokableClass(BatchTask.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(v1, v2, v3, v4);
        JobID jobId = jobGraph.getJobID();
        DirectScheduledExecutorService executor = new DirectScheduledExecutorService();
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setFutureExecutor(executor).setIoExecutor(executor).setBlobWriter(this.blobWriter).build();
        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        this.checkJobOffloaded(eg);
        ExecutionJobVertex ejv = (ExecutionJobVertex)eg.getAllVertices().get(jid2);
        ExecutionVertex vertex = ejv.getTaskVertices()[3];
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        CompletableFuture tdd = new CompletableFuture();
        taskManagerGateway.setSubmitConsumer(FunctionUtils.uncheckedConsumer(taskDeploymentDescriptor -> {
            taskDeploymentDescriptor.loadBigData(this.blobCache);
            tdd.complete(taskDeploymentDescriptor);
        }));
        TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot();
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
        vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        vertex.getCurrentExecutionAttempt().registerProducedPartitions(slot.getTaskManagerLocation(), true).get();
        vertex.deployToSlot((LogicalSlot)slot);
        Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
        this.checkTaskOffloaded((ExecutionGraph)eg, vertex.getJobvertexId());
        TaskDeploymentDescriptor descr = (TaskDeploymentDescriptor)tdd.get();
        Assert.assertNotNull((Object)descr);
        JobInformation jobInformation = (JobInformation)descr.getSerializedJobInformation().deserializeValue(((Object)((Object)this)).getClass().getClassLoader());
        TaskInformation taskInformation = (TaskInformation)descr.getSerializedTaskInformation().deserializeValue(((Object)((Object)this)).getClass().getClassLoader());
        Assert.assertEquals((Object)jobId, (Object)descr.getJobId());
        Assert.assertEquals((Object)jobId, (Object)jobInformation.getJobId());
        Assert.assertEquals((Object)jid2, (Object)taskInformation.getJobVertexId());
        Assert.assertEquals((long)3L, (long)descr.getSubtaskIndex());
        Assert.assertEquals((long)10L, (long)taskInformation.getNumberOfSubtasks());
        Assert.assertEquals((Object)BatchTask.class.getName(), (Object)taskInformation.getInvokableClassName());
        Assert.assertEquals((Object)"v2", (Object)taskInformation.getTaskName());
        List producedPartitions = descr.getProducedPartitions();
        List consumedPartitions = descr.getInputGates();
        Assert.assertEquals((long)2L, (long)producedPartitions.size());
        Assert.assertEquals((long)1L, (long)consumedPartitions.size());
        Iterator iteratorProducedPartitions = producedPartitions.iterator();
        Iterator iteratorConsumedPartitions = consumedPartitions.iterator();
        Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions());
        Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions());
        ShuffleDescriptor[] shuffleDescriptors = ((InputGateDeploymentDescriptor)iteratorConsumedPartitions.next()).getShuffleDescriptors();
        Assert.assertEquals((long)10L, (long)shuffleDescriptors.length);
        Iterator iteratorConsumedPartitionGroup = vertex.getAllConsumedPartitionGroups().iterator();
        int idx = 0;
        for (IntermediateResultPartitionID partitionId : (ConsumedPartitionGroup)iteratorConsumedPartitionGroup.next()) {
            Assert.assertEquals((Object)partitionId, (Object)shuffleDescriptors[idx++].getResultPartitionID().getPartitionId());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            SchedulerBase scheduler = this.setupScheduler(v1, 7650, v2, 2350);
            ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
            for (Execution e : executions) {
                e.markFinished();
            }
            Assert.assertEquals((long)0L, (long)scheduler.getExecutionGraph().getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            SchedulerBase scheduler = this.setupScheduler(v1, 7, v2, 6);
            ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
            for (Execution e : executions) {
                e.markFailed(null);
            }
            Assert.assertEquals((long)0L, (long)scheduler.getExecutionGraph().getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            SchedulerBase scheduler = this.setupScheduler(v1, 7, v2, 6);
            ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
            for (Execution e : executions) {
                e.fail(null);
            }
            Assert.assertEquals((long)0L, (long)scheduler.getExecutionGraph().getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAccumulatorsAndMetricsForwarding() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 1, v2, 1);
        ExecutionGraph graph = scheduler.getExecutionGraph();
        Map executions = graph.getRegisteredExecutions();
        Execution execution1 = (Execution)executions.values().iterator().next();
        IOMetrics ioMetrics = new IOMetrics(0L, 0L, 0L, 0L);
        HashMap<String, IntCounter> accumulators = new HashMap<String, IntCounter>();
        accumulators.put("acc", new IntCounter(4));
        AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
        TaskExecutionState state = new TaskExecutionState(execution1.getAttemptId(), ExecutionState.CANCELED, null, accumulatorSnapshot, ioMetrics);
        scheduler.updateTaskExecutionState(state);
        Assert.assertEquals((Object)ioMetrics, (Object)execution1.getIOMetrics());
        Assert.assertNotNull((Object)execution1.getUserAccumulators());
        Assert.assertEquals((Object)4, (Object)((Accumulator)execution1.getUserAccumulators().get("acc")).getLocalValue());
        Execution execution2 = (Execution)executions.values().iterator().next();
        IOMetrics ioMetrics2 = new IOMetrics(0L, 0L, 0L, 0L);
        HashMap<String, IntCounter> accumulators2 = new HashMap<String, IntCounter>();
        accumulators2.put("acc", new IntCounter(8));
        AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
        TaskExecutionState state2 = new TaskExecutionState(execution2.getAttemptId(), ExecutionState.FAILED, null, accumulatorSnapshot2, ioMetrics2);
        scheduler.updateTaskExecutionState(state2);
        Assert.assertEquals((Object)ioMetrics2, (Object)execution2.getIOMetrics());
        Assert.assertNotNull((Object)execution2.getUserAccumulators());
        Assert.assertEquals((Object)8, (Object)((Accumulator)execution2.getUserAccumulators().get("acc")).getLocalValue());
    }

    @Test
    public void testAccumulatorsAndMetricsStorage() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 1, v2, 1);
        Map executions = scheduler.getExecutionGraph().getRegisteredExecutions();
        IOMetrics ioMetrics = new IOMetrics(0L, 0L, 0L, 0L);
        Map accumulators = Collections.emptyMap();
        Execution execution1 = (Execution)executions.values().iterator().next();
        execution1.cancel();
        execution1.completeCancelling(accumulators, ioMetrics, false);
        Assert.assertEquals((Object)ioMetrics, (Object)execution1.getIOMetrics());
        Assert.assertEquals(accumulators, (Object)execution1.getUserAccumulators());
        Execution execution2 = (Execution)executions.values().iterator().next();
        execution2.markFailed(new Throwable(), false, accumulators, ioMetrics, false, true);
        Assert.assertEquals((Object)ioMetrics, (Object)execution2.getIOMetrics());
        Assert.assertEquals(accumulators, (Object)execution2.getUserAccumulators());
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            SchedulerBase scheduler = this.setupScheduler(v1, 19, v2, 37);
            ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
            for (Execution e : executions) {
                e.cancel();
                e.completeCancelling();
            }
            Assert.assertEquals((long)0L, (long)scheduler.getExecutionGraph().getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNoResourceAvailableFailure() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        int dop1 = 2;
        int dop2 = 2;
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobGraph graph = JobGraphTestUtils.batchJobGraph(v1, v2);
        DirectScheduledExecutorService directExecutor = new DirectScheduledExecutorService();
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(graph, ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).setFutureExecutor(directExecutor).setBlobWriter(this.blobWriter).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        this.checkJobOffloaded((DefaultExecutionGraph)eg);
        scheduler.startScheduling();
        ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.RUNNING));
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.FINISHED, null));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration jobManagerConfig = new Configuration();
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertEquals((long)((Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(v1, v2), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory()).setFutureExecutor(executorService).setBlobWriter(this.blobWriter).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        this.checkJobOffloaded((DefaultExecutionGraph)eg);
        scheduler.startScheduling();
        Map executions = eg.getRegisteredExecutions();
        Assert.assertEquals((long)(dop1 + dop2), (long)executions.size());
        return scheduler;
    }

    @Test
    public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        int negativeMaxNumberOfCheckpointsToRetain = -10;
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertNotEquals((long)-10L, (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        Assert.assertEquals((long)((Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
        void var17_19;
        int sourceParallelism = 2;
        boolean sinkParallelism = true;
        JobVertex sourceVertex = new JobVertex("source");
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        sourceVertex.setParallelism(2);
        JobVertex sinkVertex = new JobVertex("sink");
        sinkVertex.setInvokableClass(NoOpInvokable.class);
        sinkVertex.setParallelism(1);
        sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        int numberTasks = 3;
        ArrayBlockingQueue submittedTasksQueue = new ArrayBlockingQueue(3);
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            submittedTasksQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingTaskExecutorGateway taskExecutorGateway = testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
        RpcTaskManagerGateway taskManagerGateway = new RpcTaskManagerGateway((TaskExecutorGateway)taskExecutorGateway, JobMasterId.generate());
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sourceVertex, sinkVertex);
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).setFutureExecutor(new DirectScheduledExecutorService()).build();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ArrayList<CompletableFuture<TestingPhysicalSlot>> shuffledFutures = new ArrayList<CompletableFuture<TestingPhysicalSlot>>(physicalSlotProvider.getResponses().values());
        Collections.shuffle(shuffledFutures);
        for (CompletableFuture completableFuture : shuffledFutures) {
            completableFuture.complete(TestingPhysicalSlot.builder().withTaskManagerLocation(taskManagerLocation).withTaskManagerGateway((TaskManagerGateway)taskManagerGateway).build());
        }
        ArrayList submittedTasks = new ArrayList(3);
        boolean bl = false;
        while (var17_19 < 3) {
            submittedTasks.add(submittedTasksQueue.take());
            ++var17_19;
        }
        ArrayList<ExecutionAttemptID> arrayList = new ArrayList<ExecutionAttemptID>(2);
        for (ExecutionVertex taskVertex : executionGraph.getJobVertex(sourceVertex.getID()).getTaskVertices()) {
            arrayList.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        ArrayList<ExecutionAttemptID> secondStage = new ArrayList<ExecutionAttemptID>(1);
        for (ExecutionVertex taskVertex : executionGraph.getJobVertex(sinkVertex.getID()).getTaskVertices()) {
            secondStage.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        Assert.assertThat(submittedTasks, (Matcher)new ExecutionStageMatcher(Arrays.asList(arrayList, secondStage)));
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(new CheckpointCoordinatorConfiguration(100L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, 0, 0L), null));
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setJobMasterConfig(configuration).setBlobWriter(this.blobWriter).build();
    }

    private static final class ExecutionStageMatcher
    extends TypeSafeMatcher<List<ExecutionAttemptID>> {
        private final List<Collection<ExecutionAttemptID>> executionStages;

        private ExecutionStageMatcher(List<Collection<ExecutionAttemptID>> executionStages) {
            this.executionStages = executionStages;
        }

        protected boolean matchesSafely(List<ExecutionAttemptID> submissionOrder) {
            Iterator<ExecutionAttemptID> submissionIterator = submissionOrder.iterator();
            for (Collection<ExecutionAttemptID> stage : this.executionStages) {
                ArrayList<ExecutionAttemptID> currentStage = new ArrayList<ExecutionAttemptID>(stage);
                while (!currentStage.isEmpty() && submissionIterator.hasNext()) {
                    if (currentStage.remove(submissionIterator.next())) continue;
                    return false;
                }
                if (currentStage.isEmpty()) continue;
                return false;
            }
            return !submissionIterator.hasNext();
        }

        public void describeTo(Description description) {
            description.appendValueList("<[", ", ", "]>", this.executionStages);
        }
    }
}

