/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;

public class DispatcherTest
extends TestLogger {
    private static RpcService rpcService;
    private static final Time TIMEOUT;
    private static final JobID TEST_JOB_ID;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    @Rule
    public TestName name = new TestName();
    private JobGraph jobGraph;
    private TestingLeaderElectionService jobMasterLeaderElectionService;
    private CountDownLatch createdJobManagerRunnerLatch;
    private Configuration configuration;
    private BlobServer blobServer;
    private TestingDispatcher dispatcher;
    private TestingHighAvailabilityServices haServices;
    private HeartbeatServices heartbeatServices;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)TIMEOUT);
            rpcService = null;
        }
    }

    @Before
    public void setUp() throws Exception {
        JobVertex testVertex = new JobVertex("testVertex");
        testVertex.setInvokableClass(NoOpInvokable.class);
        this.jobGraph = new JobGraph(TEST_JOB_ID, "testJob", new JobVertex[]{testVertex});
        this.heartbeatServices = new HeartbeatServices(1000L, 10000L);
        this.jobMasterLeaderElectionService = new TestingLeaderElectionService();
        this.haServices = new TestingHighAvailabilityServices();
        this.haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, this.jobMasterLeaderElectionService);
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
        this.haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        this.createdJobManagerRunnerLatch = new CountDownLatch(2);
        this.blobServer = new BlobServer(this.configuration, (BlobStore)new VoidBlobStore());
    }

    @Nonnull
    private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingDispatcher dispatcher = new TestingDispatcherBuilder().setHaServices(haServices).setHeartbeatServices(heartbeatServices).setJobManagerRunnerFactory(jobManagerRunnerFactory).build();
        dispatcher.start();
        return dispatcher;
    }

    @After
    public void tearDown() throws Exception {
        if (this.dispatcher != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.dispatcher, (Time)TIMEOUT);
        }
        if (this.haServices != null) {
            this.haServices.closeAndCleanupAllData();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        acknowledgeFuture.get();
        Assert.assertTrue((String)"jobManagerRunner was not started", (boolean)this.jobMasterLeaderElectionService.getStartFuture().isDone());
    }

    @Test
    public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        ResourceSpec resourceSpec = ResourceSpec.newBuilder((double)2.0, (int)0).build();
        JobVertex firstVertex = new JobVertex("firstVertex");
        firstVertex.setInvokableClass(NoOpInvokable.class);
        firstVertex.setResources(resourceSpec, resourceSpec);
        JobVertex secondVertex = new JobVertex("secondVertex");
        secondVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraphWithTwoVertices = new JobGraph(TEST_JOB_ID, "twoVerticesJob", new JobVertex[]{firstVertex, secondVertex});
        CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
        try {
            acknowledgeFuture.get();
            Assert.fail((String)"job submission should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, JobSubmissionException.class).isPresent());
        }
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID failedJobId = new JobID();
        JobStatus expectedState = JobStatus.FAILED;
        ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder().setJobID(failedJobId).setState(expectedState).setFailureCause(new ErrorInfo((Throwable)new RuntimeException("expected"), 1L)).build();
        this.dispatcher.completeJobExecution(failedExecutionGraph);
        Assert.assertThat(dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), (Matcher)Matchers.equalTo((Object)expectedState));
        Assert.assertThat(dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(), (Matcher)Matchers.equalTo((Object)failedExecutionGraph));
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        try {
            dispatcherGateway.requestJob(new JobID(), TIMEOUT).get();
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(FlinkJobNotFoundException.class));
        }
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI externalPointer = this.createTestingSavepoint();
        Path savepointPath = Paths.get(externalPointer);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assert.assertThat((Object)Files.exists(savepointPath, new LinkOption[0]), (Matcher)Is.is((Object)true));
        dispatcherGateway.disposeSavepoint(externalPointer.toString(), TIMEOUT).get();
        Assert.assertThat((Object)Files.exists(savepointPath, new LinkOption[0]), (Matcher)Is.is((Object)false));
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        StateBackend stateBackend = Checkpoints.loadStateBackend((Configuration)this.configuration, (ClassLoader)Thread.currentThread().getContextClassLoader(), (Logger)this.log);
        CheckpointStorage checkpointStorage = stateBackend.createCheckpointStorage(this.jobGraph.getJobID());
        File savepointFile = this.temporaryFolder.newFolder();
        long checkpointId = 1L;
        CheckpointStorageLocation checkpointStorageLocation = checkpointStorage.initializeLocationForSavepoint(1L, savepointFile.getAbsolutePath());
        CheckpointMetadataOutputStream metadataOutputStream = checkpointStorageLocation.createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata((CheckpointMetadata)new CheckpointMetadata(1L, Collections.emptyList(), Collections.emptyList()), (OutputStream)metadataOutputStream);
        CompletedCheckpointStorageLocation completedCheckpointStorageLocation = metadataOutputStream.closeAndFinalizeCheckpoint();
        return new URI(completedCheckpointStorageLocation.getExternalPointer());
    }

    @Test
    public void testWaitingForJobMasterLeadership() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture jobStatusFuture = dispatcherGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat((Object)jobStatusFuture.isDone(), (Matcher)Is.is((Object)false));
        try {
            jobStatusFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Should not complete.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(jobStatusFuture.get(), (Matcher)Matchers.notNullValue());
    }

    @Test
    public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        JobGraph failingJobGraph = this.createFailingJobGraph((Exception)testException);
        this.dispatcher = new TestingDispatcherBuilder().setInitialJobGraphs(Collections.singleton(failingJobGraph)).build();
        this.dispatcher.start();
        TestingFatalErrorHandler fatalErrorHandler = this.testingFatalErrorHandlerResource.getFatalErrorHandler();
        Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)error, (String)testException.getMessage()).isPresent(), (Matcher)Is.is((Object)true));
        fatalErrorHandler.clearError();
    }

    @Test
    public void testBlockingJobManagerRunner() throws Exception {
        OneShotLatch jobManagerRunnerCreationLatch = new OneShotLatch();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new BlockingJobManagerRunnerFactory((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> ((OneShotLatch)jobManagerRunnerCreationLatch).await())));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        Assert.assertThat((Object)submissionFuture.isDone(), (Matcher)Is.is((Object)false));
        CompletableFuture metricQueryServiceAddressesFuture = dispatcherGateway.requestMetricQueryServiceAddresses(Time.seconds((long)5L));
        Assert.assertThat(metricQueryServiceAddressesFuture.get(), (Matcher)Is.is((Matcher)Matchers.empty()));
        Assert.assertThat((Object)submissionFuture.isDone(), (Matcher)Is.is((Object)false));
        jobManagerRunnerCreationLatch.trigger();
        submissionFuture.get();
    }

    @Test
    public void testFailingJobManagerRunnerCleanup() throws Exception {
        FlinkException testException = new FlinkException("Test exception.");
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new BlockingJobManagerRunnerFactory((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            Optional take = (Optional)queue.take();
            Exception exception = take.orElse(null);
            if (exception != null) {
                throw exception;
            }
        })));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        Assert.assertThat((Object)submissionFuture.isDone(), (Matcher)Is.is((Object)false));
        queue.offer(Optional.of(testException));
        try {
            submissionFuture.get();
            Assert.fail((String)"Should fail because we could not instantiate the JobManagerRunner.");
        }
        catch (Exception e) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, t -> t.equals(testException)).isPresent(), (Matcher)Is.is((Object)true));
        }
        submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        queue.offer(Optional.empty());
        submissionFuture.get();
    }

    @Test
    public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
        TestingJobGraphStore submittedJobGraphStore = TestingJobGraphStore.newBuilder().build();
        submittedJobGraphStore.start(null);
        this.haServices.setJobGraphStore(submittedJobGraphStore);
        this.dispatcher = new TestingDispatcherBuilder().setJobGraphWriter((JobGraphWriter)submittedJobGraphStore).build();
        this.dispatcher.start();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submissionFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        submissionFuture.get();
        Assert.assertThat((Object)this.dispatcher.getNumberJobs(TIMEOUT).get(), (Matcher)Matchers.is((Object)1));
        this.dispatcher.close();
        Assert.assertThat((Object)submittedJobGraphStore.contains(this.jobGraph.getJobID()), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat((Object)jobResultFuture.isDone(), (Matcher)Is.is((Object)false));
        this.dispatcher.closeAsync();
        try {
            jobResultFuture.get();
            Assert.fail((String)"Expected the job result to throw an exception.");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)ee, JobNotFinishedException.class).isPresent(), (Matcher)Is.is((Object)true));
        }
    }

    @Test
    public void testShutDownClusterShouldCompleteShutDownFuture() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, (JobManagerRunnerFactory)DefaultJobManagerRunnerFactory.INSTANCE);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.shutDownCluster().get();
        this.dispatcher.getShutDownFuture().get();
    }

    @Test
    public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
        CompletableFuture removeJobGraphFuture = new CompletableFuture();
        CompletableFuture releaseJobGraphFuture = new CompletableFuture();
        TestingJobGraphStore testingJobGraphStore = TestingJobGraphStore.newBuilder().setRemoveJobGraphConsumer((ThrowingConsumer<JobID, ? extends Exception>)((ThrowingConsumer)removeJobGraphFuture::complete)).setReleaseJobGraphConsumer((ThrowingConsumer<JobID, ? extends Exception>)((ThrowingConsumer)releaseJobGraphFuture::complete)).build();
        testingJobGraphStore.start(null);
        this.dispatcher = new TestingDispatcherBuilder().setInitialJobGraphs(Collections.singleton(this.jobGraph)).setJobGraphWriter((JobGraphWriter)testingJobGraphStore).build();
        this.dispatcher.start();
        CompletableFuture processFuture = this.dispatcher.onRemovedJobGraph(this.jobGraph.getJobID());
        processFuture.join();
        Assert.assertThat(releaseJobGraphFuture.get(), (Matcher)Is.is((Object)this.jobGraph.getJobID()));
        try {
            removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"onRemovedJobGraph should not remove the job from the JobGraphStore.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    private JobGraph createFailingJobGraph(Exception failureCause) {
        FailingJobVertex jobVertex = new FailingJobVertex("Failing JobVertex", failureCause);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return new JobGraph(this.jobGraph.getJobID(), "Failing JobGraph", new JobVertex[]{jobVertex});
    }

    static /* synthetic */ HeartbeatServices access$100(DispatcherTest x0) {
        return x0.heartbeatServices;
    }

    static /* synthetic */ TestingHighAvailabilityServices access$200(DispatcherTest x0) {
        return x0.haServices;
    }

    static {
        TIMEOUT = Time.seconds((long)10L);
        TEST_JOB_ID = new JobID();
    }

    private static final class ExpectedJobIdJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final CountDownLatch createdJobManagerRunnerLatch;

        private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId, CountDownLatch createdJobManagerRunnerLatch) {
            this.expectedJobId = expectedJobId;
            this.createdJobManagerRunnerLatch = createdJobManagerRunnerLatch;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            Assert.assertEquals((Object)this.expectedJobId, (Object)jobGraph.getJobID());
            this.createdJobManagerRunnerLatch.countDown();
            return DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }

    private static class FailingJobVertex
    extends JobVertex {
        private static final long serialVersionUID = 3218428829168840760L;
        private final Exception failure;

        private FailingJobVertex(String name, Exception failure) {
            super(name);
            this.failure = failure;
        }

        public void initializeOnMaster(ClassLoader loader) throws Exception {
            throw this.failure;
        }
    }

    private static final class BlockingJobManagerRunnerFactory
    extends TestingJobManagerRunnerFactory {
        @Nonnull
        private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;

        BlockingJobManagerRunnerFactory(@Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
            this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
        }

        @Override
        public TestingJobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            this.jobManagerRunnerCreationLatch.run();
            return super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }

    private class TestingDispatcherBuilder {
        private Collection<JobGraph> initialJobGraphs = Collections.emptyList();
        private DispatcherBootstrapFactory dispatcherBootstrapFactory = (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap();
        private HeartbeatServices heartbeatServices = DispatcherTest.access$100(DispatcherTest.this);
        private HighAvailabilityServices haServices = DispatcherTest.access$200(DispatcherTest.this);
        private JobManagerRunnerFactory jobManagerRunnerFactory = DefaultJobManagerRunnerFactory.INSTANCE;
        private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;

        private TestingDispatcherBuilder() {
        }

        TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) {
            this.heartbeatServices = heartbeatServices;
            return this;
        }

        TestingDispatcherBuilder setHaServices(HighAvailabilityServices haServices) {
            this.haServices = haServices;
            return this;
        }

        TestingDispatcherBuilder setInitialJobGraphs(Collection<JobGraph> initialJobGraphs) {
            this.initialJobGraphs = initialJobGraphs;
            return this;
        }

        TestingDispatcherBuilder setDispatcherBootstrapFactory(DispatcherBootstrapFactory dispatcherBootstrapFactory) {
            this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
            return this;
        }

        TestingDispatcherBuilder setJobManagerRunnerFactory(JobManagerRunnerFactory jobManagerRunnerFactory) {
            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
            return this;
        }

        TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) {
            this.jobGraphWriter = jobGraphWriter;
            return this;
        }

        TestingDispatcher build() throws Exception {
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            MemoryArchivedExecutionGraphStore archivedExecutionGraphStore = new MemoryArchivedExecutionGraphStore();
            return new TestingDispatcher(rpcService, DispatcherId.generate(), this.initialJobGraphs, this.dispatcherBootstrapFactory, new DispatcherServices(DispatcherTest.this.configuration, this.haServices, () -> CompletableFuture.completedFuture(resourceManagerGateway), DispatcherTest.this.blobServer, this.heartbeatServices, (ArchivedExecutionGraphStore)archivedExecutionGraphStore, (FatalErrorHandler)DispatcherTest.this.testingFatalErrorHandlerResource.getFatalErrorHandler(), (HistoryServerArchivist)VoidHistoryServerArchivist.INSTANCE, null, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), this.jobGraphWriter, this.jobManagerRunnerFactory));
        }
    }
}

