/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptive.Created;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraphTest;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.Finished;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;

public class AdaptiveSchedulerTest
extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorResource(Executors::newSingleThreadScheduledExecutor);
    private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
    private final ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)TEST_EXECUTOR_RESOURCE.getExecutor());
    private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();

    @Test
    public void testInitialState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assertions.assertThat((Object)scheduler.getState()).isInstanceOf(Created.class);
    }

    @Test
    public void testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        ArchivedExecutionGraph archivedExecutionGraph = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).build().getArchivedExecutionGraph(JobStatus.INITIALIZING, null);
        ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
    }

    @Test
    public void testIsState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        State state = scheduler.getState();
        Assertions.assertThat((boolean)scheduler.isState(state)).isTrue();
        Assertions.assertThat((boolean)scheduler.isState((State)new DummyState())).isFalse();
    }

    @Test
    public void testRunIfState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        AtomicBoolean ran = new AtomicBoolean(false);
        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
        Assertions.assertThat((boolean)ran.get()).isTrue();
    }

    @Test
    public void testRunIfStateWithStateMismatch() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        AtomicBoolean ran = new AtomicBoolean(false);
        scheduler.runIfState((State)new DummyState(), () -> ran.set(true));
        Assertions.assertThat((boolean)ran.get()).isFalse();
    }

    @Test
    public void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.startScheduling();
        ResourceCounter resourceRequirement = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        Assertions.assertThat((boolean)scheduler.hasDesiredResources(resourceRequirement)).isFalse();
    }

    @Test
    public void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        ResourceCounter resourceRequirement = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(resourceRequirement));
        Assertions.assertThat((boolean)scheduler.hasDesiredResources(resourceRequirement)).isTrue();
    }

    @Test
    public void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        boolean numRequiredSlots = true;
        ResourceCounter requiredResources = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        ResourceCounter providedResources = ResourceCounter.withResource((ResourceProfile)ResourceProfile.newBuilder().setCpuCores(1.0).build(), (int)1);
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(providedResources));
        Assertions.assertThat((boolean)scheduler.hasDesiredResources(requiredResources)).isTrue();
    }

    @Test
    public void testExecutionGraphGenerationWithAvailableResources() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        boolean numAvailableSlots = true;
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(1);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).join();
        Assertions.assertThat((int)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(1);
    }

    @Test
    public void testExecutionGraphGenerationSetsInitializationTimestamp() throws Exception {
        long initializationTimestamp = 42L;
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setInitializationTimestamp(42L).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        this.singleThreadMainThreadExecutor.execute(() -> {
            adaptiveScheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> adaptiveScheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).join();
        Assertions.assertThat((long)executionGraph.getStatusTimestamp(JobStatus.INITIALIZING)).isEqualTo(42L);
    }

    @Test
    public void testInitializationTimestampForwarding() throws Exception {
        long expectedInitializationTimestamp = 42L;
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setInitializationTimestamp(42L).build();
        long initializationTimestamp = adaptiveScheduler.requestJob().getArchivedExecutionGraph().getStatusTimestamp(JobStatus.INITIALIZING);
        Assertions.assertThat((long)initializationTimestamp).isEqualTo(42L);
    }

    @Test
    public void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setFatalErrorHandler(fatalErrorHandler).build();
        RuntimeException exception = new RuntimeException();
        scheduler.runIfState(scheduler.getState(), () -> {
            throw exception;
        });
        Assertions.assertThat((Throwable)fatalErrorHandler.getException()).isEqualTo((Object)exception);
    }

    @Test
    public void testResourceTimeout() throws Exception {
        ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
        Duration resourceTimeout = Duration.ofMinutes(1234L);
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)resourceTimeout);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), mainThreadExecutor).setJobMasterConfiguration(configuration).build();
        scheduler.startScheduling();
        boolean b = mainThreadExecutor.getActiveNonPeriodicScheduledTask().stream().anyMatch(scheduledTask -> scheduledTask.getDelay(TimeUnit.MINUTES) == resourceTimeout.toMinutes());
        Assertions.assertThat((boolean)b).isTrue();
    }

    @Test
    public void testNumRestartsMetric() throws Exception {
        CompletableFuture numRestartsMetricFuture = new CompletableFuture();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((metric, name, group) -> {
            if ("numRestarts".equals(name)) {
                numRestartsMetricFuture.complete((Gauge)metric);
            }
        }).build();
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = new DefaultDeclarativeSlotPool(jobGraph.getJobID(), (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, Time.minutes((long)10L), Time.minutes((long)10L));
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, (Object)1);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        Gauge numRestartsMetric = (Gauge)numRestartsMetricFuture.get();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            declarativeSlotPool.offerSlots(DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerLocation)new LocalTaskManagerLocation(), (TaskManagerGateway)taskManagerGateway, System.currentTimeMillis());
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Assertions.assertThat((Long)((Long)numRestartsMetric.getValue())).isEqualTo(0L);
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway));
        taskManagerGateway.waitForSubmissions(4, Duration.ofSeconds(5L));
        Assertions.assertThat((Long)((Long)numRestartsMetric.getValue())).isEqualTo(1L);
    }

    @Test
    public void testStatusMetrics() throws Exception {
        CompletableFuture upTimeMetricFuture = new CompletableFuture();
        CompletableFuture downTimeMetricFuture = new CompletableFuture();
        CompletableFuture restartTimeMetricFuture = new CompletableFuture();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((metric, name, group) -> {
            switch (name) {
                case "uptime": {
                    upTimeMetricFuture.complete((UpTimeGauge)metric);
                    break;
                }
                case "downtime": {
                    downTimeMetricFuture.complete((DownTimeGauge)metric);
                    break;
                }
                case "restartingTimeTotal": {
                    restartTimeMetricFuture.complete((Gauge)metric);
                }
            }
        }).build();
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, (Object)1);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(10L));
        configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        UpTimeGauge upTimeGauge = (UpTimeGauge)upTimeMetricFuture.get();
        DownTimeGauge downTimeGauge = (DownTimeGauge)downTimeMetricFuture.get();
        Gauge restartTimeGauge = (Gauge)restartTimeMetricFuture.get();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Thread.sleep(10L);
        Assertions.assertThat((Long)upTimeGauge.getValue()).isGreaterThan(0L);
        Assertions.assertThat((Long)downTimeGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat((Long)((Long)restartTimeGauge.getValue())).isEqualTo(0L);
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway));
        taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5L));
        Thread.sleep(10L);
        Assertions.assertThat((Long)upTimeGauge.getValue()).isGreaterThan(0L);
        Assertions.assertThat((Long)downTimeGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat((Long)((Long)restartTimeGauge.getValue())).isGreaterThanOrEqualTo(0L);
    }

    @Test
    public void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.startScheduling();
        Assertions.assertThat((Object)scheduler.getState()).isInstanceOf(WaitingForResources.class);
    }

    @Test
    public void testStartSchedulingSetsResourceRequirementsForDefaultMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        Assertions.assertThat((Collection)declarativeSlotPool.getResourceRequirements()).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)});
    }

    @Test
    public void testStartSchedulingSetsResourceRequirementsForReactiveMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        scheduler.startScheduling();
        int expectedParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)4);
        Assertions.assertThat((Collection)declarativeSlotPool.getResourceRequirements()).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)expectedParallelism)});
    }

    @Test
    public void testResourceAcquisitionTriggersJobExecution() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        CompletableFuture startingStateFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            startingStateFuture.complete(scheduler.getState());
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
        });
        Assertions.assertThat(startingStateFuture.get()).isInstanceOf(WaitingForResources.class);
        taskManagerGateway.waitForSubmissions(4, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).get();
        Assertions.assertThat((int)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(4);
    }

    @Test
    public void testGoToFinished() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
        scheduler.goToFinished(archivedExecutionGraph);
        Assertions.assertThat((Object)scheduler.getState()).isInstanceOf(Finished.class);
    }

    @Test
    public void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
        AtomicInteger numStatusUpdates = new AtomicInteger();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setJobStatusListener((jobId, newJobStatus, timestamp) -> numStatusUpdates.incrementAndGet()).build();
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)scheduler.requestJobStatus()).withFailMessage("Assumption about job status for Scheduler@Created is incorrect.", new Object[0])).isEqualTo((Object)JobStatus.INITIALIZING);
        scheduler.transitionToState((StateFactory)new DummyState.Factory(JobStatus.INITIALIZING));
        Assertions.assertThat((int)numStatusUpdates.get()).isEqualTo(0);
    }

    @Test
    public void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        CompletableFuture jobCreatedNotification = new CompletableFuture();
        CompletableFuture jobRunningNotification = new CompletableFuture();
        CompletableFuture jobFinishedNotification = new CompletableFuture();
        CompletableFuture unexpectedJobStatusNotification = new CompletableFuture();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobStatusListener((jobId, newJobStatus, timestamp) -> {
            switch (newJobStatus) {
                case CREATED: {
                    jobCreatedNotification.complete(null);
                    break;
                }
                case RUNNING: {
                    jobRunningNotification.complete(null);
                    break;
                }
                case FINISHED: {
                    jobFinishedNotification.complete(null);
                    break;
                }
                default: {
                    unexpectedJobStatusNotification.complete(newJobStatus);
                }
            }
        }).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        TaskDeploymentDescriptor submittedTask = taskManagerGateway.submittedTasks.take();
        this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(submittedTask.getExecutionAttemptId(), ExecutionState.FINISHED)));
        jobCreatedNotification.get();
        jobRunningNotification.get();
        jobFinishedNotification.get();
        Assertions.assertThat((boolean)unexpectedJobStatusNotification.isDone()).isFalse();
    }

    @Test
    public void testCloseShutsDownCheckpointingComponents() throws Exception {
        CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore completedCheckpointStore = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completedCheckpointStoreShutdownFuture);
        CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter checkpointIdCounter = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(checkpointIdCounterShutdownFuture);
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(completedCheckpointStore, checkpointIdCounter)).build();
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            scheduler.handleGlobalFailure((Throwable)new FlinkException("Test exception"));
            scheduler.closeAsync();
        });
        Assertions.assertThat((Comparable)((Comparable)completedCheckpointStoreShutdownFuture.get())).isEqualTo((Object)JobStatus.FAILED);
        Assertions.assertThat((Comparable)((Comparable)checkpointIdCounterShutdownFuture.get())).isEqualTo((Object)JobStatus.FAILED);
    }

    @Test
    public void testTransitionToStateCallsOnLeave() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
        scheduler.transitionToState((StateFactory)new StateInstanceFactory(firstState));
        firstState.reset();
        scheduler.transitionToState((StateFactory)new DummyState.Factory());
        Assertions.assertThat((boolean)firstState.onLeaveCalled).isTrue();
        Assertions.assertThat((boolean)firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
    }

    @Test
    public void testConsistentMaxParallelism() throws Exception {
        int parallelism = 240;
        int expectedMaxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)240);
        JobVertex vertex = ExecutionGraphTestUtils.createNoOpVertex(240);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(241);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = this.getArchivedExecutionGraphForRunningJob((SchedulerNG)scheduler).get();
        ArchivedExecutionJobVertex archivedVertex = executionGraph.getJobVertex(vertex.getID());
        Assertions.assertThat((int)archivedVertex.getParallelism()).isEqualTo(1);
        Assertions.assertThat((int)archivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)240)), (TaskManagerGateway)taskManagerGateway));
        taskManagerGateway.waitForSubmissions(240, Duration.ofSeconds(5L));
        ArchivedExecutionGraph resubmittedExecutionGraph = this.getArchivedExecutionGraphForRunningJob((SchedulerNG)scheduler).get();
        ArchivedExecutionJobVertex resubmittedArchivedVertex = resubmittedExecutionGraph.getJobVertex(vertex.getID());
        Assertions.assertThat((int)resubmittedArchivedVertex.getParallelism()).isEqualTo(240);
        Assertions.assertThat((int)resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
    }

    @Test
    public void testHowToHandleFailureRejectedByStrategy() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setRestartBackoffTimeStrategy((RestartBackoffTimeStrategy)NoRestartBackoffTimeStrategy.INSTANCE).build();
        Assertions.assertThat((boolean)scheduler.howToHandleFailure((Throwable)new Exception("test")).canRestart()).isFalse();
    }

    @Test
    public void testHowToHandleFailureAllowedByStrategy() throws Exception {
        TestRestartBackoffTimeStrategy restartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234L);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setRestartBackoffTimeStrategy(restartBackoffTimeStrategy).build();
        FailureResult failureResult = scheduler.howToHandleFailure((Throwable)new Exception("test"));
        Assertions.assertThat((boolean)failureResult.canRestart()).isTrue();
        Assertions.assertThat((long)failureResult.getBackoffTime().toMillis()).isEqualTo(restartBackoffTimeStrategy.getBackoffTime());
    }

    @Test
    public void testHowToHandleFailureUnrecoverableFailure() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assertions.assertThat((boolean)scheduler.howToHandleFailure((Throwable)new SuppressRestartsException((Throwable)new Exception("test"))).canRestart()).isFalse();
    }

    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic) throws Exception {
        return this.runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> {});
    }

    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic, Consumer<AdaptiveSchedulerBuilder> setupScheduler) throws Exception {
        return this.runExceptionHistoryTests(testLogic, setupScheduler, ignored -> {});
    }

    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic, Consumer<AdaptiveSchedulerBuilder> setupScheduler, Consumer<JobGraph> setupJobGraph) throws Exception {
        int numAvailableSlots = 4;
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        setupJobGraph.accept(jobGraph);
        RunFailedJobListener listener = new RunFailedJobListener();
        ArrayList cancelledTasks = new ArrayList();
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
        TestingCheckpointRecoveryFactory checkpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)checkpointIDCounter);
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveSchedulerBuilder builder = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setCheckpointRecoveryFactory(checkpointRecoveryFactory).setCheckpointCleaner(checkpointCleaner).setJobStatusListener(listener);
        setupScheduler.accept(builder);
        AdaptiveScheduler scheduler = builder.build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
        });
        listener.waitForRunning();
        CompletableFuture vertexFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> vertexFuture.complete(scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices()));
        Iterable executionVertices = (Iterable)vertexFuture.get();
        List attemptIds = IterableUtils.toStream((Iterable)executionVertices).map(ArchivedExecutionVertex::getCurrentExecutionAttempt).map(ArchivedExecution::getAttemptId).collect(Collectors.toList());
        CompletableFuture<Void> runTestLogicFuture = CompletableFuture.runAsync(() -> testLogic.accept(scheduler, attemptIds), (Executor)this.singleThreadMainThreadExecutor);
        runTestLogicFuture.get();
        Consumer<ExecutionAttemptID> canceller = attemptId -> scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.CANCELED, null)));
        CompletableFuture<Void> cancelFuture = CompletableFuture.runAsync(() -> cancelledTasks.forEach(canceller), (Executor)this.singleThreadMainThreadExecutor);
        cancelFuture.get();
        listener.waitForTerminal();
        return scheduler.requestJob().getExceptionHistory();
    }

    @Test
    public void testExceptionHistoryWithGlobalFailure() throws Exception {
        Exception expectedException = new Exception("Expected Global Exception");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> scheduler.handleGlobalFailure((Throwable)expectedException);
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = this.runExceptionHistoryTests(testLogic);
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Object)failure.getTaskManagerLocation()).isNull();
        Assertions.assertThat((String)failure.getFailingTaskName()).isNull();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    public void testExceptionHistoryWithTaskFailure() throws Exception {
        Exception expectedException = new Exception("Expected Local Exception");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException)));
        };
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = this.runExceptionHistoryTests(testLogic);
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    public void testExceptionHistoryWithTaskFailureWithRestart() throws Exception {
        Exception expectedException = new Exception("Expected Local Exception");
        Consumer<AdaptiveSchedulerBuilder> setupScheduler = builder -> builder.setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(1, 100L).create());
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException)));
        };
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = this.runExceptionHistoryTests(testLogic, setupScheduler);
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    public void testExceptionHistoryWithTaskFailureFromStopWithSavepoint() throws Exception {
        Exception expectedException = new Exception("Expected Local Exception");
        Consumer<JobGraph> setupJobGraph = jobGraph -> jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
        TestingCheckpointRecoveryFactory checkpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)checkpointIDCounter);
        Consumer<AdaptiveSchedulerBuilder> setupScheduler = builder -> builder.setCheckpointRecoveryFactory(checkpointRecoveryFactory).setCheckpointCleaner(checkpointCleaner);
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
            scheduler.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException)));
        };
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = this.runExceptionHistoryTests(testLogic, setupScheduler, setupJobGraph);
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    public void testExceptionHistoryWithTaskConcurrentGlobalFailure() throws Exception {
        Exception expectedException1 = new Exception("Expected Global Exception 1");
        Exception expectedException2 = new Exception("Expected Global Exception 2");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            scheduler.handleGlobalFailure((Throwable)expectedException1);
            scheduler.handleGlobalFailure((Throwable)expectedException2);
        };
        Iterable<RootExceptionHistoryEntry> entries = this.runExceptionHistoryTests(testLogic);
        Assertions.assertThat(entries).hasSize(1);
        RootExceptionHistoryEntry failure = entries.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException1);
        Iterable concurrentExceptions = failure.getConcurrentExceptions();
        List foundExceptions = IterableUtils.toStream((Iterable)concurrentExceptions).map(ErrorInfo::getException).map(exception -> exception.deserializeError(this.classLoader)).collect(Collectors.toList());
        Assertions.assertThat(foundExceptions).containsExactly((Object[])new Throwable[]{expectedException2});
    }

    @Test
    public void testExceptionHistoryWithTaskConcurrentFailure() throws Exception {
        Exception expectedException1 = new Exception("Expected Local Exception 1");
        Exception expectedException2 = new Exception("Expected Local Exception 2");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.remove(0);
            ExecutionAttemptID attemptId2 = (ExecutionAttemptID)attemptIds.remove(0);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException1)));
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId2, ExecutionState.FAILED, (Throwable)expectedException2)));
        };
        Iterable<RootExceptionHistoryEntry> entries = this.runExceptionHistoryTests(testLogic);
        Assertions.assertThat(entries).hasSize(1);
        RootExceptionHistoryEntry failure = entries.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException1);
        Iterable concurrentExceptions = failure.getConcurrentExceptions();
        List foundExceptions = IterableUtils.toStream((Iterable)concurrentExceptions).map(ErrorInfo::getException).map(exception -> exception.deserializeError(this.classLoader)).collect(Collectors.toList());
        Assertions.assertThat(foundExceptions).isEmpty();
    }

    @Test(expected=IllegalStateException.class)
    public void testRepeatedTransitionIntoCurrentStateFails() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        State state = scheduler.getState();
        Assertions.assertThat((Object)state).isInstanceOf(Created.class);
        scheduler.transitionToState((StateFactory)new Created.Factory((Created.Context)scheduler, this.log));
    }

    @Test
    public void testTriggerSavepointFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assertions.assertThat((CompletableFuture)scheduler.triggerSavepoint("some directory", false, SavepointFormatType.CANONICAL)).failsWithin(1L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(CheckpointException.class);
    }

    @Test
    public void testStopWithSavepointFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assertions.assertThat((CompletableFuture)scheduler.triggerSavepoint("some directory", false, SavepointFormatType.CANONICAL)).failsWithin(1L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(CheckpointException.class);
    }

    @Test(expected=TaskNotRunningException.class)
    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.deliverOperatorEventToCoordinator(new ExecutionAttemptID(), new OperatorID(), (OperatorEvent)new TestOperatorEvent());
    }

    @Test
    public void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assertions.assertThat((CompletableFuture)scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), new CoordinationRequest(){})).failsWithin(1L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
    }

    @Test
    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).build();
        Assertions.assertThat((boolean)scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(new ExecutionAttemptID(), ExecutionState.FAILED)))).isFalse();
    }

    @Test(expected=IOException.class)
    public void testRequestNextInputSplitFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new ExecutionAttemptID());
    }

    @Test(expected=PartitionProducerDisposedException.class)
    public void testRequestPartitionStateFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID());
    }

    @Test
    public void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() throws Exception {
        TestingSlotAllocator slotAllocator = TestingSlotAllocator.newBuilder().setTryReserveResourcesFunction(ignored -> Optional.empty()).build();
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setSlotAllocator(slotAllocator).build();
        CreatingExecutionGraph.AssignmentResult assignmentResult = adaptiveScheduler.tryToAssignSlots(CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create((ExecutionGraph)new StateTrackingMockExecutionGraph(), (VertexParallelism)new CreatingExecutionGraphTest.TestingVertexParallelism()));
        Assertions.assertThat((boolean)assignmentResult.isSuccess()).isFalse();
    }

    @Test
    public void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        VertexParallelismStore parallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution((JobGraph)graph, (SchedulerExecutionMode)SchedulerExecutionMode.REACTIVE, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex vertex : graph.getVertices()) {
            VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID());
            Assertions.assertThat((int)info.getParallelism()).isEqualTo(vertex.getParallelism());
            Assertions.assertThat((int)info.getMaxParallelism()).isEqualTo(vertex.getMaxParallelism());
        }
    }

    @Test
    public void testComputeVertexParallelismStoreForExecutionInDefaultMode() {
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        VertexParallelismStore parallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution((JobGraph)graph, null, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex vertex : graph.getVertices()) {
            VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID());
            Assertions.assertThat((int)info.getParallelism()).isEqualTo(vertex.getParallelism());
            Assertions.assertThat((int)info.getMaxParallelism()).isEqualTo(vertex.getMaxParallelism());
        }
    }

    @Test
    public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        try {
            DefaultSchedulerTest.doTestCheckpointCleanerIsClosedAfterCheckpointServices((checkpointRecoveryFactory, checkpointCleaner) -> {
                JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
                SchedulerTestingUtils.enableCheckpointing(jobGraph);
                try {
                    return new AdaptiveSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(executorService)).setCheckpointRecoveryFactory((CheckpointRecoveryFactory)checkpointRecoveryFactory).setCheckpointCleaner((CheckpointsCleaner)checkpointCleaner).build();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, executorService, this.log);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    private CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraphForRunningJob(SchedulerNG scheduler) {
        return CompletableFuture.supplyAsync(() -> {
            ArchivedExecutionGraph graph = null;
            while (graph == null || graph.getState() != JobStatus.RUNNING) {
                graph = scheduler.requestJob().getArchivedExecutionGraph();
            }
            return graph;
        }, (Executor)this.singleThreadMainThreadExecutor);
    }

    private Consumer<ExecutionAttemptID> createCancelConsumer(SchedulerNG scheduler) {
        return executionAttemptId -> this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.CANCELED)));
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId) {
        return new DefaultDeclarativeSlotPool(jobId, (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, Time.minutes((long)10L), Time.minutes((long)10L));
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.streamingJobGraph(JOB_VERTEX);
    }

    static class DummyState
    implements State {
        private final JobStatus jobStatus;

        public DummyState() {
            this(JobStatus.RUNNING);
        }

        public DummyState(JobStatus jobStatus) {
            this.jobStatus = jobStatus;
        }

        public void cancel() {
        }

        public void suspend(Throwable cause) {
        }

        public JobStatus getJobStatus() {
            return this.jobStatus;
        }

        public ArchivedExecutionGraph getJob() {
            return null;
        }

        public void handleGlobalFailure(Throwable cause) {
        }

        public Logger getLogger() {
            return null;
        }

        private static class Factory
        implements StateFactory<DummyState> {
            private final JobStatus jobStatus;

            public Factory() {
                this(JobStatus.RUNNING);
            }

            public Factory(JobStatus jobStatus) {
                this.jobStatus = jobStatus;
            }

            public Class<DummyState> getStateClass() {
                return DummyState.class;
            }

            public DummyState getState() {
                return new DummyState(this.jobStatus);
            }
        }
    }

    private static class StateInstanceFactory
    implements StateFactory<LifecycleMethodCapturingState> {
        private final LifecycleMethodCapturingState instance;

        public StateInstanceFactory(LifecycleMethodCapturingState instance) {
            this.instance = instance;
        }

        public Class<LifecycleMethodCapturingState> getStateClass() {
            return LifecycleMethodCapturingState.class;
        }

        public LifecycleMethodCapturingState getState() {
            return this.instance;
        }
    }

    public static class SubmissionBufferingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;

        public SubmissionBufferingTaskManagerGateway(int capacity) {
            this.submittedTasks = new ArrayBlockingQueue<TaskDeploymentDescriptor>(capacity);
            super.setSubmitConsumer(this.submittedTasks::offer);
        }

        @Override
        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
            super.setSubmitConsumer(((Consumer<TaskDeploymentDescriptor>)this.submittedTasks::offer).andThen(submitConsumer));
        }

        public List<TaskDeploymentDescriptor> waitForSubmissions(int numSubmissions, Duration perTaskTimeout) throws InterruptedException {
            ArrayList<TaskDeploymentDescriptor> descriptors = new ArrayList<TaskDeploymentDescriptor>();
            for (int i = 0; i < numSubmissions; ++i) {
                descriptors.add(this.submittedTasks.poll(perTaskTimeout.toMillis(), TimeUnit.MILLISECONDS));
            }
            return descriptors;
        }
    }

    private static class LifecycleMethodCapturingState
    extends DummyState {
        boolean onLeaveCalled = false;
        @Nullable
        Class<? extends State> onLeaveNewStateArgument = null;

        private LifecycleMethodCapturingState() {
        }

        void reset() {
            this.onLeaveCalled = false;
            this.onLeaveNewStateArgument = null;
        }

        public void onLeave(Class<? extends State> newState) {
            this.onLeaveCalled = true;
            this.onLeaveNewStateArgument = newState;
        }
    }

    static class RunFailedJobListener
    implements JobStatusListener {
        OneShotLatch jobRunning = new OneShotLatch();
        OneShotLatch jobTerminal = new OneShotLatch();

        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
            boolean hasRestarted;
            if (newJobStatus == JobStatus.RUNNING) {
                this.jobRunning.trigger();
                return;
            }
            boolean bl = hasRestarted = this.jobRunning.isTriggered() && newJobStatus == JobStatus.CREATED;
            if (newJobStatus == JobStatus.FAILED || hasRestarted) {
                this.jobTerminal.trigger();
            }
        }

        public void waitForRunning() throws InterruptedException {
            this.jobRunning.await();
        }

        public void waitForTerminal() throws InterruptedException {
            this.jobTerminal.await();
        }
    }
}

