package org.apache.flink.runtime.executiongraph;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.class */
public class ExecutionGraphDeploymentTest extends TestLogger {
    protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
    protected PermanentBlobService blobCache = null;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest$ExecutionStageMatcher.class */
    private static final class ExecutionStageMatcher extends TypeSafeMatcher<List<ExecutionAttemptID>> {
        private final List<Collection<ExecutionAttemptID>> executionStages;

        private ExecutionStageMatcher(List<Collection<ExecutionAttemptID>> list) {
            this.executionStages = list;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean matchesSafely(List<ExecutionAttemptID> list) {
            Iterator<ExecutionAttemptID> it = list.iterator();
            Iterator<Collection<ExecutionAttemptID>> it2 = this.executionStages.iterator();
            while (it2.hasNext()) {
                ArrayList arrayList = new ArrayList(it2.next());
                while (!arrayList.isEmpty() && it.hasNext()) {
                    if (!arrayList.remove(it.next())) {
                        return false;
                    }
                }
                if (!arrayList.isEmpty()) {
                    return false;
                }
            }
            return !it.hasNext();
        }

        public void describeTo(Description description) {
            description.appendValueList("<[", ", ", "]>", this.executionStages);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest$IteratorTestingSlotProvider.class */
    private static final class IteratorTestingSlotProvider extends TestingSlotProvider {

        /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest$IteratorTestingSlotProvider$IteratorSlotFutureFunction.class */
        private static class IteratorSlotFutureFunction implements Function<SlotRequestId, CompletableFuture<LogicalSlot>> {
            final Iterator<CompletableFuture<LogicalSlot>> slotIterator;

            IteratorSlotFutureFunction(Iterator<CompletableFuture<LogicalSlot>> it) {
                this.slotIterator = it;
            }

            @Override // java.util.function.Function
            public CompletableFuture<LogicalSlot> apply(SlotRequestId slotRequestId) {
                return this.slotIterator.hasNext() ? this.slotIterator.next() : FutureUtils.completedExceptionally(new FlinkException("No more slots available."));
            }
        }

        private IteratorTestingSlotProvider(Iterator<CompletableFuture<LogicalSlot>> it) {
            super(new IteratorSlotFutureFunction(it));
        }
    }

    protected void checkJobOffloaded(ExecutionGraph executionGraph) throws Exception {
        TestCase.assertTrue(executionGraph.getJobInformationOrBlobKey().isLeft());
    }

    protected void checkTaskOffloaded(ExecutionGraph executionGraph, JobVertexID jobVertexID) throws Exception {
        TestCase.assertTrue(executionGraph.getJobVertex(jobVertexID).getTaskInformationOrBlobKey().isLeft());
    }

    @Test
    public void testBuildDeploymentDescriptor() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            JobVertex jobVertex = new JobVertex("v1", jobVertexID);
            JobVertex jobVertex2 = new JobVertex("v2", jobVertexID2);
            JobVertex jobVertex3 = new JobVertex("v3", jobVertexID3);
            JobVertex jobVertex4 = new JobVertex("v4", jobVertexID4);
            jobVertex.setParallelism(10);
            jobVertex2.setParallelism(10);
            jobVertex3.setParallelism(10);
            jobVertex4.setParallelism(10);
            jobVertex.setInvokableClass(BatchTask.class);
            jobVertex2.setInvokableClass(BatchTask.class);
            jobVertex3.setInvokableClass(BatchTask.class);
            jobVertex4.setInvokableClass(BatchTask.class);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            DirectScheduledExecutorService directScheduledExecutorService = new DirectScheduledExecutorService();
            ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(jobID, "Test Job")).setFutureExecutor(directScheduledExecutorService).setIoExecutor(directScheduledExecutorService).setSlotProvider(new TestingSlotProvider(slotRequestId -> {
                return new CompletableFuture();
            })).setBlobWriter(this.blobWriter).build();
            build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
            checkJobOffloaded(build);
            build.attachJobGraph(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4));
            ExecutionVertex executionVertex = ((ExecutionJobVertex) build.getAllVertices().get(jobVertexID2)).getTaskVertices()[3];
            SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            simpleAckingTaskManagerGateway.setSubmitConsumer(FunctionUtils.uncheckedConsumer(taskDeploymentDescriptor -> {
                taskDeploymentDescriptor.loadBigData(this.blobCache);
                completableFuture.complete(taskDeploymentDescriptor);
            }));
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(simpleAckingTaskManagerGateway).createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(createTestingLogicalSlot.getTaskManagerLocation()).get();
            executionVertex.deployToSlot(createTestingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            checkTaskOffloaded(build, executionVertex.getJobvertexId());
            TaskDeploymentDescriptor taskDeploymentDescriptor2 = (TaskDeploymentDescriptor) completableFuture.get();
            Assert.assertNotNull(taskDeploymentDescriptor2);
            JobInformation jobInformation = (JobInformation) taskDeploymentDescriptor2.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
            TaskInformation taskInformation = (TaskInformation) taskDeploymentDescriptor2.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
            Assert.assertEquals(jobID, taskDeploymentDescriptor2.getJobId());
            Assert.assertEquals(jobID, jobInformation.getJobId());
            Assert.assertEquals(jobVertexID2, taskInformation.getJobVertexId());
            Assert.assertEquals(3L, taskDeploymentDescriptor2.getSubtaskIndex());
            Assert.assertEquals(10L, taskInformation.getNumberOfSubtasks());
            Assert.assertEquals(BatchTask.class.getName(), taskInformation.getInvokableClassName());
            Assert.assertEquals("v2", taskInformation.getTaskName());
            List producedPartitions = taskDeploymentDescriptor2.getProducedPartitions();
            List inputGates = taskDeploymentDescriptor2.getInputGates();
            Assert.assertEquals(2L, producedPartitions.size());
            Assert.assertEquals(1L, inputGates.size());
            Iterator it = producedPartitions.iterator();
            Iterator it2 = inputGates.iterator();
            Assert.assertEquals(10L, ((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions());
            Assert.assertEquals(10L, ((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions());
            Assert.assertEquals(10L, ((InputGateDeploymentDescriptor) it2.next()).getShuffleDescriptors().length);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> tuple2 = setupExecution(new JobVertex("v1", new JobVertexID()), 7650, new JobVertex("v2", new JobVertexID()), 2350);
            ExecutionGraph executionGraph = (ExecutionGraph) tuple2.f0;
            Iterator it = new ArrayList(((Map) tuple2.f1).values()).iterator();
            while (it.hasNext()) {
                ((Execution) it.next()).markFinished();
            }
            Assert.assertEquals(0L, executionGraph.getRegisteredExecutions().size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> tuple2 = setupExecution(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6);
            ExecutionGraph executionGraph = (ExecutionGraph) tuple2.f0;
            Iterator it = new ArrayList(((Map) tuple2.f1).values()).iterator();
            while (it.hasNext()) {
                ((Execution) it.next()).markFailed((Throwable) null);
            }
            Assert.assertEquals(0L, executionGraph.getRegisteredExecutions().size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> tuple2 = setupExecution(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6);
            ExecutionGraph executionGraph = (ExecutionGraph) tuple2.f0;
            Iterator it = new ArrayList(((Map) tuple2.f1).values()).iterator();
            while (it.hasNext()) {
                ((Execution) it.next()).fail((Throwable) null);
            }
            Assert.assertEquals(0L, executionGraph.getRegisteredExecutions().size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testAccumulatorsAndMetricsForwarding() throws Exception {
        Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> tuple2 = setupExecution(new JobVertex("v1", new JobVertexID()), 1, new JobVertex("v2", new JobVertexID()), 1);
        ExecutionGraph executionGraph = (ExecutionGraph) tuple2.f0;
        Execution execution = (Execution) ((Map) tuple2.f1).values().iterator().next();
        IOMetrics iOMetrics = new IOMetrics(0L, 0L, 0L, 0L);
        HashMap hashMap = new HashMap();
        hashMap.put("acc", new IntCounter(4));
        executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), execution.getAttemptId(), ExecutionState.CANCELED, (Throwable) null, new AccumulatorSnapshot(executionGraph.getJobID(), execution.getAttemptId(), hashMap), iOMetrics));
        Assert.assertEquals(iOMetrics, execution.getIOMetrics());
        Assert.assertNotNull(execution.getUserAccumulators());
        Assert.assertEquals(4, ((Accumulator) execution.getUserAccumulators().get("acc")).getLocalValue());
        Execution execution2 = (Execution) ((Map) tuple2.f1).values().iterator().next();
        IOMetrics iOMetrics2 = new IOMetrics(0L, 0L, 0L, 0L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("acc", new IntCounter(8));
        executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, (Throwable) null, new AccumulatorSnapshot(executionGraph.getJobID(), execution2.getAttemptId(), hashMap2), iOMetrics2));
        Assert.assertEquals(iOMetrics2, execution2.getIOMetrics());
        Assert.assertNotNull(execution2.getUserAccumulators());
        Assert.assertEquals(8, ((Accumulator) execution2.getUserAccumulators().get("acc")).getLocalValue());
    }

    @Test
    public void testAccumulatorsAndMetricsStorage() throws Exception {
        Map map = (Map) setupExecution(new JobVertex("v1", new JobVertexID()), 1, new JobVertex("v2", new JobVertexID()), 1).f1;
        IOMetrics iOMetrics = new IOMetrics(0L, 0L, 0L, 0L);
        Map emptyMap = Collections.emptyMap();
        Execution execution = (Execution) map.values().iterator().next();
        execution.cancel();
        execution.completeCancelling(emptyMap, iOMetrics, false);
        Assert.assertEquals(iOMetrics, execution.getIOMetrics());
        Assert.assertEquals(emptyMap, execution.getUserAccumulators());
        Execution execution2 = (Execution) map.values().iterator().next();
        execution2.markFailed(new Throwable(), false, emptyMap, iOMetrics, false, true);
        Assert.assertEquals(iOMetrics, execution2.getIOMetrics());
        Assert.assertEquals(emptyMap, execution2.getUserAccumulators());
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> tuple2 = setupExecution(new JobVertex("v1", new JobVertexID()), 19, new JobVertex("v2", new JobVertexID()), 37);
            ExecutionGraph executionGraph = (ExecutionGraph) tuple2.f0;
            for (Execution execution : new ArrayList(((Map) tuple2.f1).values())) {
                execution.cancel();
                execution.completeCancelling();
            }
            Assert.assertEquals(0L, executionGraph.getRegisteredExecutions().size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testNoResourceAvailableFailure() throws Exception {
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(1);
        jobVertex2.setParallelism(1);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayDeque arrayDeque = new ArrayDeque();
        for (int i = 0; i < 1; i++) {
            arrayDeque.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
        }
        ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(jobID, "Test Job")).setFutureExecutor(new DirectScheduledExecutorService()).setSlotProvider(new TestingSlotProvider(slotRequestId -> {
            return (CompletableFuture) arrayDeque.removeFirst();
        })).setBlobWriter(this.blobWriter).build();
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        checkJobOffloaded(build);
        build.attachJobGraph(Arrays.asList(jobVertex, jobVertex2));
        build.scheduleForExecution();
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        build.updateState(new TaskExecutionState(jobID, attemptId, ExecutionState.RUNNING));
        build.updateState(new TaskExecutionState(jobID, attemptId, ExecutionState.FINISHED, (Throwable) null));
        Assert.assertEquals(JobStatus.FAILED, build.getState());
    }

    @Test
    public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Assert.assertEquals(((Integer) CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), createExecutionGraph(new Configuration()).getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
        new Configuration().setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 10);
        Assert.assertEquals(10L, createExecutionGraph(r0).getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex jobVertex, int i, JobVertex jobVertex2, int i2) throws Exception {
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i2);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        ArrayDeque arrayDeque = new ArrayDeque();
        for (int i3 = 0; i3 < i + i2; i3++) {
            arrayDeque.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
        }
        ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setFutureExecutor(new DirectScheduledExecutorService()).setSlotProvider(new TestingSlotProvider(slotRequestId -> {
            return (CompletableFuture) arrayDeque.removeFirst();
        })).setBlobWriter(this.blobWriter).build();
        checkJobOffloaded(build);
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        build.attachJobGraph(Arrays.asList(jobVertex, jobVertex2));
        build.scheduleForExecution();
        Map registeredExecutions = build.getRegisteredExecutions();
        Assert.assertEquals(i + i2, registeredExecutions.size());
        return new Tuple2<>(build, registeredExecutions);
    }

    @Test
    public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(configuration);
        Assert.assertNotEquals(-10L, createExecutionGraph.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        Assert.assertEquals(((Integer) CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), createExecutionGraph.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    @Test
    public void testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Exception {
        SlotProvider programmedSlotProvider = new ProgrammedSlotProvider(2);
        Time hours = Time.hours(1L);
        JobVertexID jobVertexID = new JobVertexID();
        JobVertex jobVertex = new JobVertex("Test source", jobVertexID);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertex jobVertex2 = new JobVertex("Test sink", jobVertexID2);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        HashMap hashMap = new HashMap(2);
        for (JobVertexID jobVertexID3 : Arrays.asList(jobVertexID, jobVertexID2)) {
            CompletableFuture<LogicalSlot>[] completableFutureArr = new CompletableFuture[2];
            for (int i = 0; i < 2; i++) {
                completableFutureArr[i] = new CompletableFuture<>();
            }
            hashMap.put(jobVertexID3, completableFutureArr);
            programmedSlotProvider.addSlots(jobVertexID3, completableFutureArr);
        }
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setSlotProvider(programmedSlotProvider).setIoExecutor(scheduledThreadPoolExecutor).setFutureExecutor(scheduledThreadPoolExecutor).setAllocationTimeout(hours).setRpcTimeout(hours).build();
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        build.scheduleForExecution();
        Iterator it = build.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(ExecutionState.SCHEDULED, ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getState());
        }
        TestCase.assertTrue(programmedSlotProvider.getSlotRequestedFuture(jobVertexID, 0).get().booleanValue());
        TestCase.assertTrue(programmedSlotProvider.getSlotRequestedFuture(jobVertexID, 1).get().booleanValue());
        Assert.assertFalse(programmedSlotProvider.getSlotRequestedFuture(jobVertexID2, 0).isDone());
        Assert.assertFalse(programmedSlotProvider.getSlotRequestedFuture(jobVertexID2, 1).isDone());
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        LogicalSlot createSlot = createSlot(localTaskManagerLocation, 0);
        LogicalSlot createSlot2 = createSlot(localTaskManagerLocation, 1);
        LogicalSlot createSlot3 = createSlot(localTaskManagerLocation, 0);
        LogicalSlot createSlot4 = createSlot(localTaskManagerLocation, 1);
        ((CompletableFuture[]) hashMap.get(jobVertexID))[0].complete(createSlot);
        ((CompletableFuture[]) hashMap.get(jobVertexID))[1].complete(createSlot2);
        TestCase.assertTrue(programmedSlotProvider.getSlotRequestedFuture(jobVertexID2, 0).get().booleanValue());
        TestCase.assertTrue(programmedSlotProvider.getSlotRequestedFuture(jobVertexID2, 1).get().booleanValue());
        ((CompletableFuture[]) hashMap.get(jobVertexID2))[0].complete(createSlot3);
        ((CompletableFuture[]) hashMap.get(jobVertexID2))[1].complete(createSlot4);
        Iterator it2 = build.getAllExecutionVertices().iterator();
        while (it2.hasNext()) {
            ExecutionGraphTestUtils.waitUntilExecutionState(((ExecutionVertex) it2.next()).getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 5000L);
        }
    }

    @Test
    public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(1);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobID jobID = new JobID();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            arrayBlockingQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway(), JobMasterId.generate());
        ArrayList arrayList = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            arrayList.add(new CompletableFuture());
        }
        IteratorTestingSlotProvider iteratorTestingSlotProvider = new IteratorTestingSlotProvider(arrayList.iterator());
        JobGraph jobGraph = new JobGraph(jobID, "Test Job", new JobVertex[]{jobVertex, jobVertex2});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        ExecutionGraph build = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setSlotProvider(iteratorTestingSlotProvider).setFutureExecutor(new DirectScheduledExecutorService()).build();
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        build.scheduleForExecution();
        ArrayList arrayList2 = new ArrayList(arrayList);
        Collections.shuffle(arrayList2);
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).complete(new TestingLogicalSlotBuilder().setTaskManagerGateway(rpcTaskManagerGateway).createTestingLogicalSlot());
        }
        ArrayList arrayList3 = new ArrayList(3);
        for (int i2 = 0; i2 < 3; i2++) {
            arrayList3.add(arrayBlockingQueue.take());
        }
        ArrayList arrayList4 = new ArrayList(2);
        for (ExecutionVertex executionVertex : build.getJobVertex(jobVertex.getID()).getTaskVertices()) {
            arrayList4.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        ArrayList arrayList5 = new ArrayList(1);
        for (ExecutionVertex executionVertex2 : build.getJobVertex(jobVertex2.getID()).getTaskVertices()) {
            arrayList5.add(executionVertex2.getCurrentExecutionAttempt().getAttemptId());
        }
        Assert.assertThat(arrayList3, new ExecutionStageMatcher(Arrays.asList(arrayList4, arrayList5)));
    }

    private LogicalSlot createSlot(TaskManagerLocation taskManagerLocation, int i) {
        return new TestingLogicalSlotBuilder().setTaskManagerLocation(taskManagerLocation).setSlotNumber(i).createTestingLogicalSlot();
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        JobGraph jobGraph = new JobGraph(new JobID(), "test");
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), new CheckpointCoordinatorConfiguration(100L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, false, 0), (SerializedValue) null));
        Time seconds = Time.seconds(10L);
        return ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, jobGraph, configuration, defaultExecutor, defaultExecutor, new ProgrammedSlotProvider(1), getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), seconds, new NoRestartStrategy(), new UnregisteredMetricsGroup(), this.blobWriter, seconds, LoggerFactory.getLogger(getClass()), NettyShuffleMaster.INSTANCE, NoOpJobMasterPartitionTracker.INSTANCE, System.currentTimeMillis());
    }
}
