/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingJobManagerSharedServicesBuilder;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultSchedulerFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.QuadFunction;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterTest
extends TestLogger {
    private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time testingTimeout = Time.seconds((long)10L);
    private static final long fastHeartbeatInterval = 1L;
    private static final long fastHeartbeatTimeout = 10L;
    private static final long heartbeatInterval = 1000L;
    private static final long heartbeatTimeout = 5000000L;
    private static final JobGraph jobGraph = new JobGraph(new JobVertex[0]);
    private static TestingRpcService rpcService;
    private static HeartbeatServices fastHeartbeatServices;
    private static HeartbeatServices heartbeatServices;
    private Configuration configuration;
    private ResourceID jmResourceId;
    private JobMasterId jobMasterId;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService rmLeaderRetrievalService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
        fastHeartbeatServices = new HeartbeatServices(1L, 10L);
        heartbeatServices = new HeartbeatServices(1000L, 5000000L);
    }

    @Before
    public void setup() throws IOException {
        this.configuration = new Configuration();
        this.haServices = new TestingHighAvailabilityServices();
        this.jobMasterId = JobMasterId.generate();
        this.jmResourceId = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
        this.rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        this.haServices.setResourceManagerLeaderRetriever(this.rmLeaderRetrievalService);
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testAcceptSlotOfferAfterLeaderChange() throws Exception {
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration((Configuration)this.configuration);
        SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory((Configuration)this.configuration);
        JobGraph jobGraph = JobGraphTestUtils.createSingleVertexJobGraph();
        JobMaster testingJobMaster = new JobMaster((RpcService)rpcService, jobMasterConfiguration, this.jmResourceId, jobGraph, (HighAvailabilityServices)this.haServices, (SlotPoolFactory)DefaultSlotPoolFactory.fromConfiguration((Configuration)this.configuration), (SchedulerFactory)DefaultSchedulerFactory.fromConfiguration((Configuration)this.configuration), jobManagerSharedServices, heartbeatServices, (JobManagerJobMetricGroupFactory)UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, (OnCompletionActions)new JobMasterBuilder.TestingOnCompletionActions(), (FatalErrorHandler)this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), schedulerNGFactory, (ShuffleMaster)NettyShuffleMaster.INSTANCE, NoOpJobMasterPartitionTracker.FACTORY);
        testingJobMaster.start(this.jobMasterId).get();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)testingJobMaster.getSelfGateway(JobMasterGateway.class);
        this.log.info("Register TaskManager");
        String testingTaskManagerAddress = "fake";
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        rpcService.registerGateway(testingTaskManagerAddress, (RpcGateway)testingTaskExecutorGateway);
        Assert.assertThat(jobMasterGateway.registerTaskManager(testingTaskManagerAddress, (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, jobGraph.getJobID(), testingTimeout).get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
        this.log.info("Revoke leadership & re-grant leadership");
        testingJobMaster.suspend((Exception)new FlinkException("Lost leadership")).get();
        testingJobMaster.start(JobMasterId.generate()).get();
        this.log.info("re-register same TaskManager");
        Assert.assertThat(jobMasterGateway.registerTaskManager(testingTaskManagerAddress, (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, jobGraph.getJobID(), testingTimeout).get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
        this.log.info("Ensure JobMaster accepts slot offer");
        SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
        Collection acceptedSlots = (Collection)jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get();
        Assert.assertThat((Object)acceptedSlots.size(), (Matcher)Matchers.is((Object)1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeclineCheckpointInvocationWithUserException() throws Exception {
        AkkaRpcService rpcService1 = null;
        AkkaRpcService rpcService2 = null;
        try {
            ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
            ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
            AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration((Configuration)this.configuration);
            rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
            rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
            final CompletableFuture declineCheckpointMessageFuture = new CompletableFuture();
            JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
            JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration((Configuration)this.configuration);
            SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory((Configuration)this.configuration);
            JobMaster jobMaster = new JobMaster((RpcService)rpcService1, jobMasterConfiguration, this.jmResourceId, jobGraph, this.haServices, (SlotPoolFactory)DefaultSlotPoolFactory.fromConfiguration((Configuration)this.configuration), (SchedulerFactory)DefaultSchedulerFactory.fromConfiguration((Configuration)this.configuration), jobManagerSharedServices, heartbeatServices, (JobManagerJobMetricGroupFactory)UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new JobMasterBuilder.TestingOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), schedulerNGFactory, (ShuffleMaster)NettyShuffleMaster.INSTANCE, NoOpJobMasterPartitionTracker.FACTORY){

                public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
                    declineCheckpointMessageFuture.complete(declineCheckpoint.getSerializedCheckpointException().unwrap());
                }
            };
            jobMaster.start(this.jobMasterId).get();
            String className = "UserException";
            URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava((File)temporaryFolder.newFolder(), (String)"UserException.java", (String)String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", "UserException", "UserException"));
            Throwable userException = (Throwable)Class.forName("UserException", false, userClassLoader).newInstance();
            CheckpointException checkpointException = new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED, userException);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class).get();
            RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder((CheckpointCoordinatorGateway)jobMasterGateway);
            rpcCheckpointResponder.declineCheckpoint(jobGraph.getJobID(), new ExecutionAttemptID(), 1L, (Throwable)checkpointException);
            Throwable throwable = (Throwable)declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(CheckpointException.class));
            Optional throwableWithMessage = ExceptionUtils.findThrowableWithMessage((Throwable)throwable, (String)userException.getMessage());
            Assert.assertTrue((boolean)throwableWithMessage.isPresent());
            MatcherAssert.assertThat((Object)((Throwable)throwableWithMessage.get()).getMessage(), (Matcher)Matchers.equalTo((Object)userException.getMessage()));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcServices((Time)testingTimeout, (RpcService[])new RpcService[]{rpcService1, rpcService2});
            throw throwable;
        }
        RpcUtils.terminateRpcServices((Time)testingTimeout, (RpcService[])new RpcService[]{rpcService1, rpcService2});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithTaskManager() throws Exception {
        CompletableFuture<Object> heartbeatResourceIdFuture = new CompletableFuture<Object>();
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerConsumer((taskManagerId, ignored) -> heartbeatResourceIdFuture.complete(taskManagerId)).setDisconnectJobManagerConsumer((jobId, throwable) -> disconnectedJobManagerFuture.complete(jobId)).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, jobManagerSharedServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, jobGraph.getJobID(), testingTimeout);
            registrationResponse.get();
            JobID disconnectedJobManager = (JobID)disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)disconnectedJobManager, (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
            ResourceID heartbeatResourceId = heartbeatResourceIdFuture.getNow(null);
            MatcherAssert.assertThat((Object)heartbeatResourceId, (Matcher)Matchers.anyOf((Matcher)Matchers.nullValue(), (Matcher)Matchers.equalTo((Object)this.jmResourceId)));
        }
        finally {
            jobManagerSharedServices.shutdown();
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
        CompletableFuture assertionFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean terminateHeartbeatVerification = new AtomicBoolean(false);
        OneShotLatch hasReceivedSlotOffers = new OneShotLatch();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerConsumer((taskManagerId, allocatedSlotReport) -> {
            try {
                if (hasReceivedSlotOffers.isTriggered()) {
                    MatcherAssert.assertThat((Object)allocatedSlotReport.getAllocatedSlotInfos(), (Matcher)Matchers.hasSize((int)1));
                } else {
                    MatcherAssert.assertThat((Object)allocatedSlotReport.getAllocatedSlotInfos(), (Matcher)Matchers.empty());
                }
            }
            catch (AssertionError e) {
                assertionFuture.completeExceptionally((Throwable)((Object)e));
            }
            if (terminateHeartbeatVerification.get()) {
                assertionFuture.complete(null);
            }
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph jobGraph = JobGraphTestUtils.createSingleVertexJobGraph();
        JobMaster jobMaster = new JobMasterBuilder(jobGraph, (RpcService)rpcService).withHeartbeatServices(new HeartbeatServices(5L, 1000L)).withSlotPoolFactory(new TestingSlotPoolFactory(hasReceivedSlotOffers)).createJobMaster();
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, jobGraph.getJobID(), testingTimeout);
            registrationResponse.get();
            SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
            CompletableFuture slotOfferFuture = jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
            MatcherAssert.assertThat(slotOfferFuture.get(), (Matcher)Matchers.containsInAnyOrder((Object[])new SlotOffer[]{slotOffer}));
            terminateHeartbeatVerification.set(true);
            assertionFuture.get();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
            jobManagerSharedServices.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        String resourceManagerAddress = "rm";
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID rmResourceId = new ResourceID("rm");
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(resourceManagerId, rmResourceId, "rm", "localhost");
        CompletableFuture jobManagerRegistrationFuture = new CompletableFuture();
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        resourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
            jobManagerRegistrationFuture.complete(Tuple3.of((Object)jobMasterId, (Object)resourceID, (Object)jobID));
            registrationAttempts.countDown();
            return CompletableFuture.completedFuture(resourceManagerGateway.getJobMasterRegistrationSuccess());
        }));
        resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
        rpcService.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, jobManagerSharedServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            this.rmLeaderRetrievalService.notifyListener("rm", resourceManagerId.toUUID());
            Tuple3 registrationInformation = (Tuple3)jobManagerRegistrationFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)registrationInformation.f0, (Matcher)Matchers.equalTo((Object)this.jobMasterId));
            MatcherAssert.assertThat((Object)registrationInformation.f1, (Matcher)Matchers.equalTo((Object)this.jmResourceId));
            MatcherAssert.assertThat((Object)registrationInformation.f2, (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
            JobID disconnectedJobManager = (JobID)disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)disconnectedJobManager, (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
            registrationAttempts.await();
        }
        finally {
            jobManagerSharedServices.shutdown();
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoringFromSavepoint() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false);
            MatcherAssert.assertThat((Object)savepointCheckpoint, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)savepointCheckpoint.getCheckpointID(), (Matcher)Matchers.is((Object)42L));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoringModifiedJobFromSavepoint() throws Exception {
        long savepointId = 42L;
        OperatorID operatorID = new OperatorID();
        File savepointFile = this.createSavepointWithOperatorState(42L, operatorID);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)false);
        JobVertex jobVertex = new JobVertex("New operator");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraphWithNewOperator = this.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        try {
            this.createJobMaster(this.configuration, jobGraphWithNewOperator, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
            Assert.fail((String)"Should fail because we cannot resume the changed JobGraph from the savepoint.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        jobGraphWithNewOperator.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)true));
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraphWithNewOperator, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false);
            MatcherAssert.assertThat((Object)savepointCheckpoint, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)savepointCheckpoint.getCheckpointID(), (Matcher)Matchers.is((Object)42L));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointPrecedesSavepointRecovery() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)("" + savepointFile.getAbsolutePath()), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        long checkpointId = 1L;
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobGraph.getJobID(), 1L, 1L, 1L, Collections.emptyMap(), null, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new DummyCheckpointStorageLocation());
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        completedCheckpointStore.addCheckpoint(completedCheckpoint);
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)new StandaloneCheckpointIDCounter());
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false);
            MatcherAssert.assertThat((Object)savepointCheckpoint, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)savepointCheckpoint.getCheckpointID(), (Matcher)Matchers.is((Object)1L));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
        JobGraph restartingJobGraph = this.createSingleVertexJobWithRestartStrategy();
        long slotRequestTimeout = 10L;
        this.configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 10L);
        JobMaster jobMaster = this.createJobMaster(this.configuration, restartingJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            long start = System.nanoTime();
            jobMaster.start(JobMasterId.generate()).get();
            TestingResourceManagerGateway resourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
            resourceManagerGateway.setRequestSlotConsumer(blockingQueue::offer);
            this.notifyResourceManagerLeaderListeners(resourceManagerGateway);
            blockingQueue.take();
            CompletableFuture submittedTaskFuture = new CompletableFuture();
            LocalUnresolvedTaskManagerLocation taskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((tdd, ignored) -> {
                submittedTaskFuture.complete(tdd);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
            jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)taskManagerUnresolvedLocation, restartingJobGraph.getJobID(), testingTimeout).get();
            SlotRequest slotRequest = (SlotRequest)blockingQueue.take();
            long end = System.nanoTime();
            MatcherAssert.assertThat((Object)((end - start) / 1000000L), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(10L)));
            MatcherAssert.assertThat((Object)submittedTaskFuture.isDone(), (Matcher)Matchers.is((Object)false));
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.ANY);
            CompletableFuture acceptedSlotsFuture = jobMasterGateway.offerSlots(taskManagerUnresolvedLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
            Collection acceptedSlots = (Collection)acceptedSlotsFuture.get();
            MatcherAssert.assertThat((Object)acceptedSlots, (Matcher)Matchers.hasSize((int)1));
            SlotOffer acceptedSlot = (SlotOffer)acceptedSlots.iterator().next();
            MatcherAssert.assertThat((Object)acceptedSlot.getAllocationId(), (Matcher)Matchers.equalTo((Object)slotRequest.getAllocationId()));
            TaskDeploymentDescriptor taskDeploymentDescriptor = (TaskDeploymentDescriptor)submittedTaskFuture.get();
            MatcherAssert.assertThat((Object)taskDeploymentDescriptor.getAllocationId(), (Matcher)Matchers.equalTo((Object)slotRequest.getAllocationId()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseUnestablishedResourceManagerConnection() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            jobMaster.start(JobMasterId.generate()).get();
            TestingResourceManagerGateway firstResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            TestingResourceManagerGateway secondResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            OneShotLatch firstJobManagerRegistration = new OneShotLatch();
            OneShotLatch secondJobManagerRegistration = new OneShotLatch();
            firstResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                firstJobManagerRegistration.trigger();
                return CompletableFuture.completedFuture(firstResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            secondResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                secondJobManagerRegistration.trigger();
                return CompletableFuture.completedFuture(secondResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            this.notifyResourceManagerLeaderListeners(firstResourceManagerGateway);
            firstJobManagerRegistration.await();
            this.notifyResourceManagerLeaderListeners(secondResourceManagerGateway);
            secondJobManagerRegistration.await();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReconnectionAfterDisconnect() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            TestingResourceManagerGateway testingResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue registrationsQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                registrationsQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken();
            this.notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
            JobMasterId firstRegistrationAttempt = (JobMasterId)registrationsQueue.take();
            MatcherAssert.assertThat((Object)firstRegistrationAttempt, (Matcher)Matchers.equalTo((Object)this.jobMasterId));
            MatcherAssert.assertThat((Object)registrationsQueue.isEmpty(), (Matcher)Matchers.is((Object)true));
            jobMasterGateway.disconnectResourceManager(resourceManagerId, (Exception)new FlinkException("Test exception"));
            MatcherAssert.assertThat(registrationsQueue.take(), (Matcher)Matchers.equalTo((Object)this.jobMasterId));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            TestingResourceManagerGateway testingResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue registrationQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                registrationQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            this.notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
            JobMasterId firstRegistrationAttempt = (JobMasterId)registrationQueue.take();
            MatcherAssert.assertThat((Object)firstRegistrationAttempt, (Matcher)Matchers.equalTo((Object)this.jobMasterId));
            jobMaster.suspend((Exception)new FlinkException("Test exception.")).get();
            JobMasterId jobMasterId2 = JobMasterId.generate();
            jobMaster.start(jobMasterId2).get();
            JobMasterId secondRegistrationAttempt = (JobMasterId)registrationQueue.take();
            MatcherAssert.assertThat((Object)secondRegistrationAttempt, (Matcher)Matchers.equalTo((Object)jobMasterId2));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Test
    public void testRequestNextInputSplitWithLocalFailover() throws Exception {
        this.configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        Function<List<List<InputSplit>>, Collection<InputSplit>> expectFailedExecutionInputSplits = inputSplitsPerTask -> (List)inputSplitsPerTask.get(0);
        this.runRequestNextInputSplitTest(expectFailedExecutionInputSplits);
    }

    @Test
    public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
        this.configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofSeconds(0L));
        this.configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        Function<List<List<InputSplit>>, Collection<InputSplit>> expectAllRemainingInputSplits = this::flattenCollection;
        this.runRequestNextInputSplitTest(expectAllRemainingInputSplits);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runRequestNextInputSplitTest(Function<List<List<InputSplit>>, Collection<InputSplit>> expectedRemainingInputSplits) throws Exception {
        int parallelism = 2;
        int splitsPerTask = 2;
        int totalSplits = 4;
        ArrayList<TestingInputSplit> allInputSplits = new ArrayList<TestingInputSplit>(4);
        for (int i = 0; i < 4; ++i) {
            allInputSplits.add(new TestingInputSplit(i));
        }
        TestingInputSplitSource inputSplitSource = new TestingInputSplitSource(allInputSplits);
        JobVertex source = new JobVertex("source");
        source.setParallelism(2);
        source.setInputSplitSource((InputSplitSource)inputSplitSource);
        source.setInvokableClass(AbstractInvokable.class);
        JobGraph inputSplitJobGraph = new JobGraph(new JobVertex[]{source});
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)100, (long)0L));
        inputSplitJobGraph.setExecutionConfig(executionConfig);
        JobMaster jobMaster = this.createJobMaster(this.configuration, inputSplitJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobVertexID sourceId = source.getID();
            List<AccessExecution> executions = JobMasterTest.getExecutions(jobMasterGateway, sourceId);
            ExecutionAttemptID initialAttemptId = executions.get(0).getAttemptId();
            ArrayList<List<InputSplit>> inputSplitsPerTask = new ArrayList<List<InputSplit>>(2);
            for (AccessExecution execution : executions) {
                inputSplitsPerTask.add(JobMasterTest.getInputSplits(2, this.getInputSplitSupplier(sourceId, jobMasterGateway, execution.getAttemptId())));
            }
            List<InputSplit> allRequestedInputSplits = this.flattenCollection(inputSplitsPerTask);
            MatcherAssert.assertThat(allRequestedInputSplits, (Matcher)Matchers.containsInAnyOrder((Object[])allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)));
            this.waitUntilAllExecutionsAreScheduled(jobMasterGateway);
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(inputSplitJobGraph.getJobID(), initialAttemptId, ExecutionState.FAILED)).get();
            this.waitUntilAllExecutionsAreScheduled(jobMasterGateway);
            ExecutionAttemptID restartedAttemptId = JobMasterTest.getFirstExecution(jobMasterGateway, sourceId).getAttemptId();
            List<InputSplit> inputSplits = this.getRemainingInputSplits(this.getInputSplitSupplier(sourceId, jobMasterGateway, restartedAttemptId));
            MatcherAssert.assertThat(inputSplits, (Matcher)Matchers.containsInAnyOrder((Object[])expectedRemainingInputSplits.apply(inputSplitsPerTask).toArray(EMPTY_TESTING_INPUT_SPLITS)));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Nonnull
    private List<InputSplit> flattenCollection(List<List<InputSplit>> inputSplitsPerTask) {
        return inputSplitsPerTask.stream().flatMap(Collection::stream).collect(Collectors.toList());
    }

    @Nonnull
    private Supplier<SerializedInputSplit> getInputSplitSupplier(JobVertexID jobVertexID, JobMasterGateway jobMasterGateway, ExecutionAttemptID initialAttemptId) {
        return () -> JobMasterTest.getInputSplit(jobMasterGateway, jobVertexID, initialAttemptId);
    }

    private void waitUntilAllExecutionsAreScheduled(JobMasterGateway jobMasterGateway) throws Exception {
        Duration duration = Duration.ofMillis(testingTimeout.toMilliseconds());
        Deadline deadline = Deadline.fromNow((Duration)duration);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> JobMasterTest.getExecutions(jobMasterGateway).stream().allMatch(execution -> execution.getState() == ExecutionState.SCHEDULED)), deadline);
    }

    private static AccessExecution getFirstExecution(JobMasterGateway jobMasterGateway, JobVertexID jobVertexId) {
        List<AccessExecution> executions = JobMasterTest.getExecutions(jobMasterGateway, jobVertexId);
        MatcherAssert.assertThat(executions, (Matcher)Matchers.hasSize((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(1))));
        return executions.get(0);
    }

    private static Collection<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway) {
        ArchivedExecutionGraph archivedExecutionGraph = JobMasterTest.requestExecutionGraph(jobMasterGateway);
        return archivedExecutionGraph.getAllVertices().values().stream().flatMap(vertex -> Arrays.stream(vertex.getTaskVertices())).map(AccessExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList());
    }

    private static List<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway, JobVertexID jobVertexId) {
        ArchivedExecutionGraph archivedExecutionGraph = JobMasterTest.requestExecutionGraph(jobMasterGateway);
        return Optional.ofNullable(archivedExecutionGraph.getAllVertices().get(jobVertexId)).map(accessExecutionJobVertex -> Arrays.asList(accessExecutionJobVertex.getTaskVertices())).orElse(Collections.emptyList()).stream().map(AccessExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList());
    }

    private static ArchivedExecutionGraph requestExecutionGraph(JobMasterGateway jobMasterGateway) {
        try {
            return (ArchivedExecutionGraph)jobMasterGateway.requestJob(testingTimeout).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    private static List<InputSplit> getInputSplits(int numberInputSplits, Supplier<SerializedInputSplit> nextInputSplit) throws Exception {
        ArrayList<InputSplit> actualInputSplits = new ArrayList<InputSplit>(numberInputSplits);
        for (int i = 0; i < numberInputSplits; ++i) {
            SerializedInputSplit serializedInputSplit = nextInputSplit.get();
            MatcherAssert.assertThat((Object)serializedInputSplit.isEmpty(), (Matcher)Matchers.is((Object)false));
            actualInputSplits.add((InputSplit)InstantiationUtil.deserializeObject((byte[])serializedInputSplit.getInputSplitData(), (ClassLoader)ClassLoader.getSystemClassLoader()));
        }
        return actualInputSplits;
    }

    private List<InputSplit> getRemainingInputSplits(Supplier<SerializedInputSplit> nextInputSplit) throws Exception {
        ArrayList<InputSplit> actualInputSplits = new ArrayList<InputSplit>(16);
        boolean hasMoreInputSplits = true;
        while (hasMoreInputSplits) {
            SerializedInputSplit serializedInputSplit = nextInputSplit.get();
            if (serializedInputSplit.isEmpty()) {
                hasMoreInputSplits = false;
                continue;
            }
            InputSplit inputSplit = (InputSplit)InstantiationUtil.deserializeObject((byte[])serializedInputSplit.getInputSplitData(), (ClassLoader)ClassLoader.getSystemClassLoader());
            if (inputSplit == null) {
                hasMoreInputSplits = false;
                continue;
            }
            actualInputSplits.add(inputSplit);
        }
        return actualInputSplits;
    }

    private static SerializedInputSplit getInputSplit(JobMasterGateway jobMasterGateway, JobVertexID jobVertexId, ExecutionAttemptID attemptId) {
        try {
            return (SerializedInputSplit)jobMasterGateway.requestNextInputSplit(jobVertexId, attemptId).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateWithoutRegistration() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                jobMasterGateway.requestKvStateLocation(graph.getJobID(), "unknown").get();
                Assert.fail((String)"Expected to fail with UnknownKvStateLocation");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, UnknownKvStateLocation.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateOfWrongJob() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                jobMasterGateway.requestKvStateLocation(new JobID(), "unknown").get();
                Assert.fail((String)"Expected to fail with FlinkJobNotFoundException");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Nonnull
    public JobGraph createKvJobGraph() {
        JobVertex vertex1 = new JobVertex("v1");
        vertex1.setParallelism(4);
        vertex1.setMaxParallelism(16);
        vertex1.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex vertex2 = new JobVertex("v2");
        vertex2.setParallelism(4);
        vertex2.setMaxParallelism(16);
        vertex2.setInvokableClass(BlockingNoOpInvokable.class);
        return new JobGraph(new JobVertex[]{vertex1, vertex2});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                jobMasterGateway.notifyKvStateRegistered(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any-name", new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
                Assert.fail((String)"Expected to fail with FlinkJobNotFoundException.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterAndUnregisterKvState() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        List jobVertices = graph.getVerticesSortedTopologicallyFromSources();
        JobVertex vertex1 = (JobVertex)jobVertices.get(0);
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            String registrationName = "register-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            jobMasterGateway.notifyKvStateRegistered(graph.getJobID(), vertex1.getID(), keyGroupRange, "register-me", kvStateID, address).get();
            KvStateLocation location = (KvStateLocation)jobMasterGateway.requestKvStateLocation(graph.getJobID(), "register-me").get();
            Assert.assertEquals((Object)graph.getJobID(), (Object)location.getJobId());
            Assert.assertEquals((Object)vertex1.getID(), (Object)location.getJobVertexId());
            Assert.assertEquals((long)vertex1.getMaxParallelism(), (long)location.getNumKeyGroups());
            Assert.assertEquals((long)1L, (long)location.getNumRegisteredKeyGroups());
            Assert.assertEquals((long)1L, (long)keyGroupRange.getNumberOfKeyGroups());
            Assert.assertEquals((Object)kvStateID, (Object)location.getKvStateID(keyGroupRange.getStartKeyGroup()));
            Assert.assertEquals((Object)address, (Object)location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
            jobMasterGateway.notifyKvStateUnregistered(graph.getJobID(), vertex1.getID(), keyGroupRange, "register-me").get();
            try {
                jobMasterGateway.requestKvStateLocation(graph.getJobID(), "register-me").get();
                Assert.fail((String)"Expected to fail with an UnknownKvStateLocation.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, UnknownKvStateLocation.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        JobGraph graph = this.createKvJobGraph();
        List jobVertices = graph.getVerticesSortedTopologicallyFromSources();
        JobVertex vertex1 = (JobVertex)jobVertices.get(0);
        JobVertex vertex2 = (JobVertex)jobVertices.get(1);
        JobMaster jobMaster = this.createJobMaster(this.configuration, graph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            String registrationName = "duplicate-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 4396);
            jobMasterGateway.notifyKvStateRegistered(graph.getJobID(), vertex1.getID(), keyGroupRange, "duplicate-me", kvStateID, address).get();
            try {
                jobMasterGateway.notifyKvStateRegistered(graph.getJobID(), vertex2.getID(), keyGroupRange, "duplicate-me", kvStateID, address).get();
                Assert.fail((String)"Expected to fail because of clashing registration message.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Registration name clash").isPresent());
                Assert.assertEquals((Object)JobStatus.FAILED, jobMasterGateway.requestJobStatus(testingTimeout).get());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestPartitionState() throws Exception {
        JobGraph producerConsumerJobGraph = this.producerConsumerJobGraph();
        JobMaster jobMaster = this.createJobMaster(this.configuration, producerConsumerJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture tddFuture = new CompletableFuture();
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                tddFuture.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, producerConsumerJobGraph.getJobID(), testingTaskExecutorGateway);
            MatcherAssert.assertThat(slotOffers, (Matcher)Matchers.hasSize((int)1));
            TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor)tddFuture.get();
            MatcherAssert.assertThat((Object)tdd.getProducedPartitions(), (Matcher)Matchers.hasSize((int)1));
            ResultPartitionDeploymentDescriptor partition = (ResultPartitionDeploymentDescriptor)tdd.getProducedPartitions().iterator().next();
            ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
            ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
            ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId);
            CompletableFuture partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), partitionId);
            MatcherAssert.assertThat(partitionStateFuture.get(), (Matcher)Matchers.equalTo((Object)ExecutionState.FINISHED));
            partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID());
            try {
                partitionStateFuture.get();
                Assert.fail((String)"Expected failure.");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, IllegalArgumentException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
            partitionStateFuture = jobMasterGateway.requestPartitionState(new IntermediateDataSetID(), partitionId);
            try {
                partitionStateFuture.get();
                Assert.fail((String)"Expected failure.");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, IllegalArgumentException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
            partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), new ExecutionAttemptID()));
            try {
                partitionStateFuture.get();
                Assert.fail((String)"Expected failure.");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, PartitionProducerDisposedException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    private void notifyResourceManagerLeaderListeners(TestingResourceManagerGateway testingResourceManagerGateway) {
        this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointTimeout() throws Exception {
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration((Configuration)this.configuration);
        SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory((Configuration)this.configuration);
        JobMaster jobMaster = new JobMaster((RpcService)rpcService, jobMasterConfiguration, this.jmResourceId, jobGraph, this.haServices, (SlotPoolFactory)DefaultSlotPoolFactory.fromConfiguration((Configuration)this.configuration), (SchedulerFactory)DefaultSchedulerFactory.fromConfiguration((Configuration)this.configuration), jobManagerSharedServices, heartbeatServices, (JobManagerJobMetricGroupFactory)UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new JobMasterBuilder.TestingOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), schedulerNGFactory, (ShuffleMaster)NettyShuffleMaster.INSTANCE, NoOpJobMasterPartitionTracker.FACTORY){

            public CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob, Time timeout) {
                return new CompletableFuture<String>();
            }
        };
        try {
            CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds((long)1L));
            CompletableFuture savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
            try {
                savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
                Assert.fail();
            }
            catch (ExecutionException e) {
                Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
                MatcherAssert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(TimeoutException.class));
            }
            MatcherAssert.assertThat((Object)savepointFutureHighTimeout.isDone(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)false)));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph jobGraph = this.createSingleVertexJobWithRestartStrategy();
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, jobManagerSharedServices, heartbeatServices);
        CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
        CompletableFuture freedSlotFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, throwable) -> {
            freedSlotFuture.complete(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
        try {
            jobMaster.start(this.jobMasterId).get();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), testingTaskExecutorGateway);
            MatcherAssert.assertThat(slotOffers, (Matcher)Matchers.hasSize((int)1));
            AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
            jobMasterGateway.notifyAllocationFailure(allocationId, (Exception)new FlinkException("Fail alloction test exception"));
            MatcherAssert.assertThat(freedSlotFuture.get(), (Matcher)Matchers.equalTo((Object)allocationId));
            MatcherAssert.assertThat(disconnectTaskExecutorFuture.get(), (Matcher)Matchers.equalTo((Object)jobGraph.getJobID()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception {
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph jobGraph = JobGraphTestUtils.createSingleVertexJobGraph();
        LocalUnresolvedTaskManagerLocation taskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean isTrackingPartitions = new AtomicBoolean(true);
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        partitionTracker.setIsTrackingPartitionsForFunction(ignored -> isTrackingPartitions.get());
        JobMaster jobMaster = new JobMasterBuilder(jobGraph, (RpcService)rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withJobManagerSharedServices(jobManagerSharedServices).withHeartbeatServices(heartbeatServices).withPartitionTrackerFactory(ignored -> partitionTracker).createJobMaster();
        CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
        CompletableFuture freedSlotFuture = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, throwable) -> {
            freedSlotFuture.complete(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
        try {
            jobMaster.start(this.jobMasterId).get();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), testingTaskExecutorGateway, taskManagerUnresolvedLocation);
            MatcherAssert.assertThat(slotOffers, (Matcher)Matchers.hasSize((int)1));
            AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
            jobMasterGateway.notifyAllocationFailure(allocationId, (Exception)new FlinkException("Fail allocation test exception"));
            MatcherAssert.assertThat(freedSlotFuture.get(), (Matcher)Matchers.equalTo((Object)allocationId));
            jobMasterGateway.requestJobStatus(Time.seconds((long)5L)).get();
            MatcherAssert.assertThat((Object)disconnectTaskExecutorFuture.isDone(), (Matcher)Matchers.is((Object)false));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobMasterAggregatesValuesCorrectly() throws Exception {
        JobMaster jobMaster = this.createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture startFuture = jobMaster.start(this.jobMasterId);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            AggregateFunction<Integer, Integer, Integer> aggregateFunction = this.createAggregateFunction();
            ClosureCleaner.clean(aggregateFunction, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
            CompletableFuture updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)1, serializedAggregateFunction);
            MatcherAssert.assertThat(updateAggregateFuture.get(), (Matcher)Matchers.equalTo((Object)1));
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)2, serializedAggregateFunction);
            MatcherAssert.assertThat(updateAggregateFuture.get(), (Matcher)Matchers.equalTo((Object)3));
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)3, serializedAggregateFunction);
            MatcherAssert.assertThat(updateAggregateFuture.get(), (Matcher)Matchers.equalTo((Object)6));
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)4, serializedAggregateFunction);
            MatcherAssert.assertThat(updateAggregateFuture.get(), (Matcher)Matchers.equalTo((Object)10));
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", (Object)10, serializedAggregateFunction);
            MatcherAssert.assertThat(updateAggregateFuture.get(), (Matcher)Matchers.equalTo((Object)10));
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", (Object)23, serializedAggregateFunction);
            MatcherAssert.assertThat(updateAggregateFuture.get(), (Matcher)Matchers.equalTo((Object)33));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    private AggregateFunction<Integer, Integer, Integer> createAggregateFunction() {
        return new AggregateFunction<Integer, Integer, Integer>(){

            public Integer createAccumulator() {
                return 0;
            }

            public Integer add(Integer value, Integer accumulator) {
                return accumulator + value;
            }

            public Integer getResult(Integer accumulator) {
                return accumulator;
            }

            public Integer merge(Integer a, Integer b) {
                return this.add(a, b);
            }
        };
    }

    @Nonnull
    private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        return testingResourceManagerGateway;
    }

    @Test
    public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
        this.runJobFailureWhenTaskExecutorTerminatesTest(heartbeatServices, (localTaskManagerLocation, jobMasterGateway) -> jobMasterGateway.disconnectTaskManager(localTaskManagerLocation.getResourceID(), (Exception)new FlinkException("Test disconnectTaskManager exception.")), (jobMasterGateway, resourceID) -> (ignoredA, ignoredB) -> {});
    }

    @Test
    public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
        TestingHeartbeatServices testingHeartbeatService = new TestingHeartbeatServices(1000L, 5000000L);
        this.runJobFailureWhenTaskExecutorTerminatesTest(testingHeartbeatService, (localTaskManagerLocation, jobMasterGateway) -> testingHeartbeatService.triggerHeartbeatTimeout(this.jmResourceId, localTaskManagerLocation.getResourceID()), (jobMasterGateway, taskManagerResourceId) -> (resourceId, ignored) -> jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new AccumulatorReport(Collections.emptyList())));
    }

    @Test
    public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(jobGraph, (RpcService)rpcService).createJobMaster();
        try {
            jobMaster.start();
            CompletableFuture registrationResponse = jobMaster.registerTaskManager("foobar", (UnresolvedTaskManagerLocation)new LocalUnresolvedTaskManagerLocation(), new JobID(), testingTimeout);
            MatcherAssert.assertThat(registrationResponse.get(), (Matcher)Matchers.instanceOf(JMTMRegistrationRejection.class));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    @Test
    public void testMultipleStartsWork() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(jobGraph, (RpcService)rpcService).withHighAvailabilityServices((HighAvailabilityServices)new StandaloneHaServices("localhost", "localhost", "localhost")).createJobMaster();
        try {
            jobMaster.start(JobMasterId.generate()).join();
            jobMaster.suspend((Exception)new FlinkException("Test exception.")).join();
            jobMaster.start(JobMasterId.generate()).join();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runJobFailureWhenTaskExecutorTerminatesTest(HeartbeatServices heartbeatServices, BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState, BiFunction<JobMasterGateway, ResourceID, BiConsumer<ResourceID, AllocatedSlotReport>> heartbeatConsumerFunction) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.createSingleVertexJobGraph();
        JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        JobMaster jobMaster = this.createJobMaster(new Configuration(), jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices, onCompletionActions);
        try {
            jobMaster.start(this.jobMasterId).get();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            LocalUnresolvedTaskManagerLocation taskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
            CompletableFuture taskDeploymentFuture = new CompletableFuture();
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway, taskManagerUnresolvedLocation.getResourceID())).createTestingTaskExecutorGateway();
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), taskExecutorGateway, taskManagerUnresolvedLocation);
            MatcherAssert.assertThat(slotOffers, (Matcher)Matchers.hasSize((int)1));
            ExecutionAttemptID executionAttemptId = (ExecutionAttemptID)taskDeploymentFuture.get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), executionAttemptId, ExecutionState.RUNNING)).get();
            jobReachedRunningState.accept(taskManagerUnresolvedLocation, jobMasterGateway);
            ArchivedExecutionGraph archivedExecutionGraph = onCompletionActions.getJobReachedGloballyTerminalStateFuture().get();
            MatcherAssert.assertThat((Object)archivedExecutionGraph.getState(), (Matcher)Matchers.is((Object)JobStatus.FAILED));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    private Collection<SlotOffer> registerSlotsAtJobMaster(int numberSlots, JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway) throws ExecutionException, InterruptedException {
        return this.registerSlotsAtJobMaster(numberSlots, jobMasterGateway, jobId, taskExecutorGateway, new LocalUnresolvedTaskManagerLocation());
    }

    private Collection<SlotOffer> registerSlotsAtJobMaster(int numberSlots, JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws ExecutionException, InterruptedException {
        AllocationIdsResourceManagerGateway allocationIdsResourceManagerGateway = new AllocationIdsResourceManagerGateway();
        rpcService.registerGateway(allocationIdsResourceManagerGateway.getAddress(), (RpcGateway)allocationIdsResourceManagerGateway);
        this.notifyResourceManagerLeaderListeners(allocationIdsResourceManagerGateway);
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), unresolvedTaskManagerLocation, jobId, testingTimeout).get();
        Collection slotOffers = IntStream.range(0, numberSlots).mapToObj(index -> {
            AllocationID allocationId = allocationIdsResourceManagerGateway.takeAllocationId();
            return new SlotOffer(allocationId, index, ResourceProfile.ANY);
        }).collect(Collectors.toList());
        return (Collection)jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout).get();
    }

    private JobGraph producerConsumerJobGraph() {
        JobVertex producer = new JobVertex("Producer");
        producer.setInvokableClass(NoOpInvokable.class);
        JobVertex consumer = new JobVertex("Consumer");
        consumer.setInvokableClass(NoOpInvokable.class);
        consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        return new JobGraph(new JobVertex[]{producer, consumer});
    }

    private File createSavepoint(long savepointId) throws IOException {
        return this.createSavepointWithOperatorState(savepointId, new OperatorID[0]);
    }

    private File createSavepointWithOperatorState(long savepointId, OperatorID ... operatorIds) throws IOException {
        File savepointFile = temporaryFolder.newFile();
        Collection<OperatorState> operatorStates = this.createOperatorState(operatorIds);
        CheckpointMetadata savepoint = new CheckpointMetadata(savepointId, operatorStates, Collections.emptyList());
        try (FileOutputStream fileOutputStream = new FileOutputStream(savepointFile);){
            Checkpoints.storeCheckpointMetadata((CheckpointMetadata)savepoint, (OutputStream)fileOutputStream);
        }
        return savepointFile;
    }

    private Collection<OperatorState> createOperatorState(OperatorID ... operatorIds) {
        Random random = new Random();
        ArrayList<OperatorState> operatorStates = new ArrayList<OperatorState>(operatorIds.length);
        for (OperatorID operatorId : operatorIds) {
            OperatorState operatorState = new OperatorState(operatorId, 1, 42);
            OperatorSubtaskState subtaskState = new OperatorSubtaskState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.emptyMap(), (StreamStateHandle)new ByteStreamStateHandle("foobar", new byte[0])), null, null, null, StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle(10, random)), StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random)));
            operatorState.putState(0, subtaskState);
            operatorStates.add(operatorState);
        }
        return operatorStates;
    }

    @Nonnull
    private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
        return this.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, new JobVertex[0]);
    }

    @Nonnull
    private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex ... jobVertices) {
        JobGraph jobGraph = new JobGraph(jobVertices);
        CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(1000L, 1000L, 1000L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, false, 0);
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), checkpoinCoordinatorConfiguration, null);
        jobGraph.setSnapshotSettings(checkpointingSettings);
        jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
        return jobGraph;
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices) throws Exception {
        return this.createJobMaster(configuration, jobGraph, highAvailabilityServices, jobManagerSharedServices, fastHeartbeatServices);
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices) throws Exception {
        return this.createJobMaster(configuration, jobGraph, highAvailabilityServices, jobManagerSharedServices, heartbeatServices, new JobMasterBuilder.TestingOnCompletionActions());
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, OnCompletionActions onCompletionActions) throws Exception {
        return new JobMasterBuilder(jobGraph, (RpcService)rpcService).withConfiguration(configuration).withHighAvailabilityServices(highAvailabilityServices).withJobManagerSharedServices(jobManagerSharedServices).withHeartbeatServices(heartbeatServices).withOnCompletionActions(onCompletionActions).withResourceId(this.jmResourceId).createJobMaster();
    }

    private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
        JobGraph jobGraph = JobGraphTestUtils.createSingleVertexJobGraph();
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    private static final class DummyCheckpointStorageLocation
    implements CompletedCheckpointStorageLocation {
        private static final long serialVersionUID = 164095949572620688L;

        private DummyCheckpointStorageLocation() {
        }

        public String getExternalPointer() {
            return null;
        }

        public StreamStateHandle getMetadataHandle() {
            return null;
        }

        public void disposeStorageLocation() throws IOException {
        }
    }

    private static final class AllocationIdsResourceManagerGateway
    extends TestingResourceManagerGateway {
        private final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<AllocationID>(10);

        private AllocationIdsResourceManagerGateway() {
            this.setRequestSlotConsumer(slotRequest -> this.allocationIds.offer(slotRequest.getAllocationId()));
        }

        AllocationID takeAllocationId() {
            try {
                return this.allocationIds.take();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
                return null;
            }
        }
    }

    private static final class TestingInputSplit
    implements InputSplit {
        private static final long serialVersionUID = -5404803705463116083L;
        private final int splitNumber;

        TestingInputSplit(int number) {
            this.splitNumber = number;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestingInputSplit that = (TestingInputSplit)o;
            return this.splitNumber == that.splitNumber;
        }

        public int hashCode() {
            return Objects.hash(this.splitNumber);
        }
    }

    private static final class TestingInputSplitSource
    implements InputSplitSource<TestingInputSplit> {
        private static final long serialVersionUID = -2344684048759139086L;
        private final List<TestingInputSplit> inputSplits;

        private TestingInputSplitSource(List<TestingInputSplit> inputSplits) {
            this.inputSplits = inputSplits;
        }

        public TestingInputSplit[] createInputSplits(int minNumSplits) {
            return this.inputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS);
        }

        public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] inputSplits) {
            return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
        }
    }

    private static final class TestingSlotPool
    implements SlotPool {
        private final JobID jobId;
        private final OneShotLatch hasReceivedSlotOffers;
        private final Map<ResourceID, Collection<SlotInfo>> registeredSlots;

        private TestingSlotPool(JobID jobId, OneShotLatch hasReceivedSlotOffers) {
            this.jobId = jobId;
            this.hasReceivedSlotOffers = hasReceivedSlotOffers;
            this.registeredSlots = new HashMap<ResourceID, Collection<SlotInfo>>(16);
        }

        public void start(JobMasterId jobMasterId, String newJobManagerAddress, ComponentMainThreadExecutor jmMainThreadScheduledExecutor) {
        }

        public void suspend() {
            this.clear();
        }

        public void close() {
            this.clear();
        }

        private void clear() {
            this.registeredSlots.clear();
        }

        public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public void disconnectResourceManager() {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public boolean registerTaskManager(ResourceID resourceID) {
            this.registeredSlots.computeIfAbsent(resourceID, ignored -> new ArrayList(16));
            return true;
        }

        public boolean releaseTaskManager(ResourceID resourceId, Exception cause) {
            this.registeredSlots.remove(resourceId);
            return true;
        }

        public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
            this.hasReceivedSlotOffers.trigger();
            Collection<SlotInfo> slotInfos = Optional.ofNullable(this.registeredSlots.get(taskManagerLocation.getResourceID())).orElseThrow(() -> new FlinkRuntimeException("TaskManager not registered."));
            int slotIndex = slotInfos.size();
            for (SlotOffer offer : offers) {
                slotInfos.add((SlotInfo)new SimpleSlotContext(offer.getAllocationId(), taskManagerLocation, slotIndex, taskManagerGateway));
                ++slotIndex;
            }
            return offers;
        }

        public Optional<ResourceID> failAllocation(AllocationID allocationID, Exception cause) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
            Collection allSlotInfos = this.registeredSlots.values().stream().flatMap(Collection::stream).map(slot -> SlotInfoWithUtilization.from((SlotInfo)slot, (double)0.0)).collect(Collectors.toList());
            return Collections.unmodifiableCollection(allSlotInfos);
        }

        public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout) {
            return new CompletableFuture<PhysicalSlot>();
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
            return new CompletableFuture<PhysicalSlot>();
        }

        public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
            Collection slotInfos = this.registeredSlots.getOrDefault(taskManagerId, Collections.emptyList());
            List allocatedSlotInfos = slotInfos.stream().map(slotInfo -> new AllocatedSlotInfo(slotInfo.getPhysicalSlotNumber(), slotInfo.getAllocationId())).collect(Collectors.toList());
            return new AllocatedSlotReport(this.jobId, allocatedSlotInfos);
        }

        public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }
    }

    private static final class TestingSlotPoolFactory
    implements SlotPoolFactory {
        private final OneShotLatch hasReceivedSlotOffers;

        public TestingSlotPoolFactory(OneShotLatch hasReceivedSlotOffers) {
            this.hasReceivedSlotOffers = hasReceivedSlotOffers;
        }

        @Nonnull
        public SlotPool createSlotPool(@Nonnull JobID jobId) {
            return new TestingSlotPool(jobId, this.hasReceivedSlotOffers);
        }
    }
}

