/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.ProgrammedSlotProvider;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionGraphDeploymentTest
extends TestLogger {
    protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
    protected PermanentBlobService blobCache = null;

    protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
        TestCase.assertTrue((boolean)eg.getJobInformationOrBlobKey().isLeft());
    }

    protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
        TestCase.assertTrue((boolean)eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft());
    }

    @Test
    public void testBuildDeploymentDescriptor() {
        try {
            JobID jobId = new JobID();
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            JobVertexID jid4 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            JobVertex v3 = new JobVertex("v3", jid3);
            JobVertex v4 = new JobVertex("v4", jid4);
            v1.setParallelism(10);
            v2.setParallelism(10);
            v3.setParallelism(10);
            v4.setParallelism(10);
            v1.setInvokableClass(BatchTask.class);
            v2.setInvokableClass(BatchTask.class);
            v3.setInvokableClass(BatchTask.class);
            v4.setInvokableClass(BatchTask.class);
            v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            DirectScheduledExecutorService executor = new DirectScheduledExecutorService();
            ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(jobId, "Test Job")).setFutureExecutor(executor).setIoExecutor(executor).setSlotProvider(new TestingSlotProvider(ignore -> new CompletableFuture())).setBlobWriter(this.blobWriter).build();
            eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
            this.checkJobOffloaded(eg);
            List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
            eg.attachJobGraph(ordered);
            ExecutionJobVertex ejv = (ExecutionJobVertex)eg.getAllVertices().get(jid2);
            ExecutionVertex vertex = ejv.getTaskVertices()[3];
            SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
            CompletableFuture tdd = new CompletableFuture();
            taskManagerGateway.setSubmitConsumer(FunctionUtils.uncheckedConsumer(taskDeploymentDescriptor -> {
                taskDeploymentDescriptor.loadBigData(this.blobCache);
                tdd.complete(taskDeploymentDescriptor);
            }));
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.getCurrentExecutionAttempt().registerProducedPartitions(slot.getTaskManagerLocation()).get();
            vertex.deployToSlot((LogicalSlot)slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            this.checkTaskOffloaded(eg, vertex.getJobvertexId());
            TaskDeploymentDescriptor descr = (TaskDeploymentDescriptor)tdd.get();
            Assert.assertNotNull((Object)descr);
            JobInformation jobInformation = (JobInformation)descr.getSerializedJobInformation().deserializeValue(((Object)((Object)this)).getClass().getClassLoader());
            TaskInformation taskInformation = (TaskInformation)descr.getSerializedTaskInformation().deserializeValue(((Object)((Object)this)).getClass().getClassLoader());
            Assert.assertEquals((Object)jobId, (Object)descr.getJobId());
            Assert.assertEquals((Object)jobId, (Object)jobInformation.getJobId());
            Assert.assertEquals((Object)jid2, (Object)taskInformation.getJobVertexId());
            Assert.assertEquals((long)3L, (long)descr.getSubtaskIndex());
            Assert.assertEquals((long)10L, (long)taskInformation.getNumberOfSubtasks());
            Assert.assertEquals((Object)BatchTask.class.getName(), (Object)taskInformation.getInvokableClassName());
            Assert.assertEquals((Object)"v2", (Object)taskInformation.getTaskName());
            List producedPartitions = descr.getProducedPartitions();
            List consumedPartitions = descr.getInputGates();
            Assert.assertEquals((long)2L, (long)producedPartitions.size());
            Assert.assertEquals((long)1L, (long)consumedPartitions.size());
            Iterator iteratorProducedPartitions = producedPartitions.iterator();
            Iterator iteratorConsumedPartitions = consumedPartitions.iterator();
            Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions());
            Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions());
            Assert.assertEquals((long)10L, (long)((InputGateDeploymentDescriptor)iteratorConsumedPartitions.next()).getShuffleDescriptors().length);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = this.setupExecution(v1, 7650, v2, 2350);
            ExecutionGraph testExecutionGraph = (ExecutionGraph)graphExecutionsTuple.f0;
            ArrayList executions = new ArrayList(((Map)graphExecutionsTuple.f1).values());
            for (Execution e : executions) {
                e.markFinished();
            }
            Assert.assertEquals((long)0L, (long)testExecutionGraph.getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = this.setupExecution(v1, 7, v2, 6);
            ExecutionGraph testExecutionGraph = (ExecutionGraph)graphExecutionsTuple.f0;
            ArrayList executions = new ArrayList(((Map)graphExecutionsTuple.f1).values());
            for (Execution e : executions) {
                e.markFailed(null);
            }
            Assert.assertEquals((long)0L, (long)testExecutionGraph.getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = this.setupExecution(v1, 7, v2, 6);
            ExecutionGraph testExecutionGraph = (ExecutionGraph)graphExecutionsTuple.f0;
            ArrayList executions = new ArrayList(((Map)graphExecutionsTuple.f1).values());
            for (Execution e : executions) {
                e.fail(null);
            }
            Assert.assertEquals((long)0L, (long)testExecutionGraph.getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAccumulatorsAndMetricsForwarding() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphAndExecutions = this.setupExecution(v1, 1, v2, 1);
        ExecutionGraph graph = (ExecutionGraph)graphAndExecutions.f0;
        Execution execution1 = (Execution)((Map)graphAndExecutions.f1).values().iterator().next();
        IOMetrics ioMetrics = new IOMetrics(0L, 0L, 0L, 0L);
        HashMap<String, IntCounter> accumulators = new HashMap<String, IntCounter>();
        accumulators.put("acc", new IntCounter(4));
        AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
        TaskExecutionState state = new TaskExecutionState(graph.getJobID(), execution1.getAttemptId(), ExecutionState.CANCELED, null, accumulatorSnapshot, ioMetrics);
        graph.updateState(state);
        Assert.assertEquals((Object)ioMetrics, (Object)execution1.getIOMetrics());
        Assert.assertNotNull((Object)execution1.getUserAccumulators());
        Assert.assertEquals((Object)4, (Object)((Accumulator)execution1.getUserAccumulators().get("acc")).getLocalValue());
        Execution execution2 = (Execution)((Map)graphAndExecutions.f1).values().iterator().next();
        IOMetrics ioMetrics2 = new IOMetrics(0L, 0L, 0L, 0L);
        HashMap<String, IntCounter> accumulators2 = new HashMap<String, IntCounter>();
        accumulators2.put("acc", new IntCounter(8));
        AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
        TaskExecutionState state2 = new TaskExecutionState(graph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, null, accumulatorSnapshot2, ioMetrics2);
        graph.updateState(state2);
        Assert.assertEquals((Object)ioMetrics2, (Object)execution2.getIOMetrics());
        Assert.assertNotNull((Object)execution2.getUserAccumulators());
        Assert.assertEquals((Object)8, (Object)((Accumulator)execution2.getUserAccumulators().get("acc")).getLocalValue());
    }

    @Test
    public void testAccumulatorsAndMetricsStorage() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        Map executions = (Map)this.setupExecution((JobVertex)v1, (int)1, (JobVertex)v2, (int)1).f1;
        IOMetrics ioMetrics = new IOMetrics(0L, 0L, 0L, 0L);
        Map accumulators = Collections.emptyMap();
        Execution execution1 = (Execution)executions.values().iterator().next();
        execution1.cancel();
        execution1.completeCancelling(accumulators, ioMetrics, false);
        Assert.assertEquals((Object)ioMetrics, (Object)execution1.getIOMetrics());
        Assert.assertEquals(accumulators, (Object)execution1.getUserAccumulators());
        Execution execution2 = (Execution)executions.values().iterator().next();
        execution2.markFailed(new Throwable(), accumulators, ioMetrics);
        Assert.assertEquals((Object)ioMetrics, (Object)execution2.getIOMetrics());
        Assert.assertEquals(accumulators, (Object)execution2.getUserAccumulators());
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = this.setupExecution(v1, 19, v2, 37);
            ExecutionGraph testExecutionGraph = (ExecutionGraph)graphExecutionsTuple.f0;
            ArrayList executions = new ArrayList(((Map)graphExecutionsTuple.f1).values());
            for (Execution e : executions) {
                e.cancel();
                e.completeCancelling();
            }
            Assert.assertEquals((long)0L, (long)testExecutionGraph.getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNoResourceAvailableFailure() throws Exception {
        JobID jobId = new JobID();
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        int dop1 = 1;
        int dop2 = 1;
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayDeque<CompletableFuture<TestingLogicalSlot>> slotFutures = new ArrayDeque<CompletableFuture<TestingLogicalSlot>>();
        for (int i = 0; i < dop1; ++i) {
            slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
        }
        TestingSlotProvider slotProvider = new TestingSlotProvider(ignore -> (CompletableFuture)slotFutures.removeFirst());
        DirectScheduledExecutorService directExecutor = new DirectScheduledExecutorService();
        ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(jobId, "Test Job")).setFutureExecutor(directExecutor).setSlotProvider(slotProvider).setBlobWriter(this.blobWriter).build();
        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        this.checkJobOffloaded(eg);
        List<JobVertex> ordered = Arrays.asList(v1, v2);
        eg.attachJobGraph(ordered);
        eg.scheduleForExecution();
        ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING));
        eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration jobManagerConfig = new Configuration();
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertEquals((long)((Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
        int maxNumberOfCheckpointsToRetain = 10;
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 10);
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertEquals((long)10L, (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        ArrayDeque<CompletableFuture<TestingLogicalSlot>> slotFutures = new ArrayDeque<CompletableFuture<TestingLogicalSlot>>();
        for (int i = 0; i < dop1 + dop2; ++i) {
            slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()));
        }
        TestingSlotProvider slotProvider = new TestingSlotProvider(ignore -> (CompletableFuture)slotFutures.removeFirst());
        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
        ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder().setFutureExecutor(executorService).setSlotProvider(slotProvider).setBlobWriter(this.blobWriter).build();
        this.checkJobOffloaded(eg);
        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        List<JobVertex> ordered = Arrays.asList(v1, v2);
        eg.attachJobGraph(ordered);
        eg.scheduleForExecution();
        Map executions = eg.getRegisteredExecutions();
        Assert.assertEquals((long)(dop1 + dop2), (long)executions.size());
        return new Tuple2((Object)eg, (Object)executions);
    }

    @Test
    public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        int negativeMaxNumberOfCheckpointsToRetain = -10;
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertNotEquals((long)-10L, (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        Assert.assertEquals((long)((Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    @Test
    public void testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Exception {
        int parallelism = 2;
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(2);
        Time timeout = Time.hours((long)1L);
        JobVertexID sourceVertexId = new JobVertexID();
        JobVertex sourceVertex = new JobVertex("Test source", sourceVertexId);
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        sourceVertex.setParallelism(2);
        JobVertexID sinkVertexId = new JobVertexID();
        JobVertex sinkVertex = new JobVertex("Test sink", sinkVertexId);
        sinkVertex.setInvokableClass(NoOpInvokable.class);
        sinkVertex.setParallelism(2);
        sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        HashMap<JobVertexID, CompletableFuture[]> slotFutures = new HashMap<JobVertexID, CompletableFuture[]>(2);
        for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) {
            CompletableFuture[] slotFutureArray = new CompletableFuture[2];
            for (int i = 0; i < 2; ++i) {
                slotFutureArray[i] = new CompletableFuture();
            }
            slotFutures.put(jobVertexID, slotFutureArray);
            slotProvider.addSlots(jobVertexID, slotFutureArray);
        }
        ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{sourceVertex, sinkVertex});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setSlotProvider((SlotProvider)slotProvider).setIoExecutor(scheduledExecutorService).setFutureExecutor(scheduledExecutorService).setAllocationTimeout(timeout).setRpcTimeout(timeout).build();
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        executionGraph.scheduleForExecution();
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)executionVertex.getCurrentExecutionAttempt().getState());
        }
        TestCase.assertTrue((boolean)slotProvider.getSlotRequestedFuture(sourceVertexId, 0).get());
        TestCase.assertTrue((boolean)slotProvider.getSlotRequestedFuture(sourceVertexId, 1).get());
        Assert.assertFalse((boolean)slotProvider.getSlotRequestedFuture(sinkVertexId, 0).isDone());
        Assert.assertFalse((boolean)slotProvider.getSlotRequestedFuture(sinkVertexId, 1).isDone());
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        LogicalSlot sourceSlot1 = this.createSlot(localTaskManagerLocation, 0);
        LogicalSlot sourceSlot2 = this.createSlot(localTaskManagerLocation, 1);
        LogicalSlot sinkSlot1 = this.createSlot(localTaskManagerLocation, 0);
        LogicalSlot sinkSlot2 = this.createSlot(localTaskManagerLocation, 1);
        ((CompletableFuture[])slotFutures.get(sourceVertexId))[0].complete(sourceSlot1);
        ((CompletableFuture[])slotFutures.get(sourceVertexId))[1].complete(sourceSlot2);
        TestCase.assertTrue((boolean)slotProvider.getSlotRequestedFuture(sinkVertexId, 0).get());
        TestCase.assertTrue((boolean)slotProvider.getSlotRequestedFuture(sinkVertexId, 1).get());
        ((CompletableFuture[])slotFutures.get(sinkVertexId))[0].complete(sinkSlot1);
        ((CompletableFuture[])slotFutures.get(sinkVertexId))[1].complete(sinkSlot2);
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 5000L);
        }
    }

    @Test
    public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
        int sourceParallelism = 2;
        boolean sinkParallelism = true;
        JobVertex sourceVertex = new JobVertex("source");
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        sourceVertex.setParallelism(2);
        JobVertex sinkVertex = new JobVertex("sink");
        sinkVertex.setInvokableClass(NoOpInvokable.class);
        sinkVertex.setParallelism(1);
        sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobID jobId = new JobID();
        int numberTasks = 3;
        ArrayBlockingQueue submittedTasksQueue = new ArrayBlockingQueue(3);
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            submittedTasksQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TestingTaskExecutorGateway taskExecutorGateway = testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
        RpcTaskManagerGateway taskManagerGateway = new RpcTaskManagerGateway((TaskExecutorGateway)taskExecutorGateway, JobMasterId.generate());
        ArrayList slotFutures = new ArrayList(3);
        for (int i = 0; i < 3; ++i) {
            slotFutures.add(new CompletableFuture());
        }
        IteratorTestingSlotProvider slotProvider = new IteratorTestingSlotProvider(slotFutures.iterator());
        JobGraph jobGraph = new JobGraph(jobId, "Test Job", new JobVertex[]{sourceVertex, sinkVertex});
        jobGraph.setScheduleMode(ScheduleMode.EAGER);
        ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setSlotProvider(slotProvider).setFutureExecutor(new DirectScheduledExecutorService()).build();
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        executionGraph.scheduleForExecution();
        ArrayList shuffledFutures = new ArrayList(slotFutures);
        Collections.shuffle(shuffledFutures);
        for (CompletableFuture slotFuture : shuffledFutures) {
            slotFuture.complete(new TestingLogicalSlotBuilder().setTaskManagerGateway((TaskManagerGateway)taskManagerGateway).createTestingLogicalSlot());
        }
        ArrayList submittedTasks = new ArrayList(3);
        for (int i = 0; i < 3; ++i) {
            submittedTasks.add(submittedTasksQueue.take());
        }
        ArrayList<ExecutionAttemptID> firstStage = new ArrayList<ExecutionAttemptID>(2);
        for (ExecutionVertex taskVertex : executionGraph.getJobVertex(sourceVertex.getID()).getTaskVertices()) {
            firstStage.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        ArrayList<ExecutionAttemptID> secondStage = new ArrayList<ExecutionAttemptID>(1);
        for (ExecutionVertex taskVertex : executionGraph.getJobVertex(sinkVertex.getID()).getTaskVertices()) {
            secondStage.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        Assert.assertThat(submittedTasks, (Matcher)new ExecutionStageMatcher(Arrays.asList(firstStage, secondStage)));
    }

    private LogicalSlot createSlot(TaskManagerLocation taskManagerLocation, int index) {
        return new TestingLogicalSlotBuilder().setTaskManagerLocation(taskManagerLocation).setSlotNumber(index).createTestingLogicalSlot();
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        ScheduledExecutorService executor = TestingUtils.defaultExecutor();
        JobID jobId = new JobID();
        JobGraph jobGraph = new JobGraph(jobId, "test");
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), new CheckpointCoordinatorConfiguration(100L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, false, 0), null));
        Time timeout = Time.seconds((long)10L);
        return ExecutionGraphBuilder.buildGraph(null, (JobGraph)jobGraph, (Configuration)configuration, (ScheduledExecutorService)executor, (Executor)executor, (SlotProvider)new ProgrammedSlotProvider(1), (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory(), (Time)timeout, (RestartStrategy)new NoRestartStrategy(), (MetricGroup)new UnregisteredMetricsGroup(), (BlobWriter)this.blobWriter, (Time)timeout, (Logger)LoggerFactory.getLogger(((Object)((Object)this)).getClass()), (ShuffleMaster)NettyShuffleMaster.INSTANCE, (JobMasterPartitionTracker)NoOpJobMasterPartitionTracker.INSTANCE);
    }

    private static final class ExecutionStageMatcher
    extends TypeSafeMatcher<List<ExecutionAttemptID>> {
        private final List<Collection<ExecutionAttemptID>> executionStages;

        private ExecutionStageMatcher(List<Collection<ExecutionAttemptID>> executionStages) {
            this.executionStages = executionStages;
        }

        protected boolean matchesSafely(List<ExecutionAttemptID> submissionOrder) {
            Iterator<ExecutionAttemptID> submissionIterator = submissionOrder.iterator();
            for (Collection<ExecutionAttemptID> stage : this.executionStages) {
                ArrayList<ExecutionAttemptID> currentStage = new ArrayList<ExecutionAttemptID>(stage);
                while (!currentStage.isEmpty() && submissionIterator.hasNext()) {
                    if (currentStage.remove(submissionIterator.next())) continue;
                    return false;
                }
                if (currentStage.isEmpty()) continue;
                return false;
            }
            return !submissionIterator.hasNext();
        }

        public void describeTo(Description description) {
            description.appendValueList("<[", ", ", "]>", this.executionStages);
        }
    }

    private static final class IteratorTestingSlotProvider
    extends TestingSlotProvider {
        private IteratorTestingSlotProvider(Iterator<CompletableFuture<LogicalSlot>> slotIterator) {
            super(new IteratorSlotFutureFunction(slotIterator));
        }

        private static class IteratorSlotFutureFunction
        implements Function<SlotRequestId, CompletableFuture<LogicalSlot>> {
            final Iterator<CompletableFuture<LogicalSlot>> slotIterator;

            IteratorSlotFutureFunction(Iterator<CompletableFuture<LogicalSlot>> slotIterator) {
                this.slotIterator = slotIterator;
            }

            @Override
            public CompletableFuture<LogicalSlot> apply(SlotRequestId slotRequestId) {
                if (this.slotIterator.hasNext()) {
                    return this.slotIterator.next();
                }
                return FutureUtils.completedExceptionally((Throwable)new FlinkException("No more slots available."));
            }
        }
    }
}

