/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphSchedulingTest;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ProgrammedSlotProvider;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ExecutionTest
extends TestLogger {
    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource();
    private final TestingComponentMainThreadExecutor testMainThreadUtil = EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    @Test
    public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<LogicalSlot>();
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
        slotProvider.addSlot(jobVertexId, 0, slotFuture);
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider)slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        TestingLogicalSlot slot = this.createTestingLogicalSlot(slotOwner);
        TestingLogicalSlot otherSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
        CompletableFuture allocationFuture = execution.allocateResourcesForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        Assert.assertFalse((boolean)allocationFuture.isDone());
        Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)execution.getState());
        Assert.assertTrue((boolean)execution.tryAssignResource((LogicalSlot)otherSlot));
        slotFuture.complete(slot);
        Assert.assertEquals((Object)slot, (Object)slotOwner.getReturnedSlotFuture().get());
    }

    private TestingLogicalSlot createTestingLogicalSlot(SlotOwner slotOwner) {
        return new TestingLogicalSlotBuilder().setSlotOwner(slotOwner).createTestingLogicalSlot();
    }

    @Test
    public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        TestingLogicalSlot slot = this.createTestingLogicalSlot(slotOwner);
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
        slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider)slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        CompletableFuture allocationFuture = execution.allocateResourcesForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        Assert.assertTrue((boolean)allocationFuture.isDone());
        Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)execution.getState());
        Assert.assertEquals((Object)slot, (Object)execution.getAssignedResource());
        execution.cancel();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)execution.getState());
        Assert.assertEquals((Object)slot, (Object)slotOwner.getReturnedSlotFuture().get());
    }

    @Test
    public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        TestingLogicalSlot slot = this.createTestingLogicalSlot(slotOwner);
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
        slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider)slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        CompletableFuture allocationFuture = execution.allocateResourcesForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        Assert.assertTrue((boolean)allocationFuture.isDone());
        Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)execution.getState());
        Assert.assertEquals((Object)slot, (Object)execution.getAssignedResource());
        execution.deploy();
        execution.switchToRunning();
        execution.cancel();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)execution.getState());
        execution.completeCancelling();
        Assert.assertEquals((Object)slot, (Object)slotOwner.getReturnedSlotFuture().get());
    }

    @Test
    public void testSlotAllocationCancellationWhenExecutionCancelled() throws Exception {
        JobVertexID jobVertexId = new JobVertexID();
        JobVertex jobVertex = new JobVertex("test vertex", jobVertexId);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
        CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<LogicalSlot>();
        slotProvider.addSlot(jobVertexId, 0, slotFuture);
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider)slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        CompletableFuture allocationFuture = currentExecutionAttempt.allocateResourcesForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ALL, Collections.emptySet());
        Assert.assertThat((Object)allocationFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)slotProvider.getSlotRequestedFuture(jobVertexId, 0).get(), (Matcher)Matchers.is((Object)true));
        Set<SlotRequestId> slotRequests = slotProvider.getSlotRequests();
        Assert.assertThat(slotRequests, (Matcher)Matchers.hasSize((int)1));
        Assert.assertThat((Object)currentExecutionAttempt.getState(), (Matcher)Matchers.is((Object)ExecutionState.SCHEDULED));
        currentExecutionAttempt.cancel();
        Assert.assertThat((Object)currentExecutionAttempt.getState(), (Matcher)Matchers.is((Object)ExecutionState.CANCELED));
        Assert.assertThat((Object)allocationFuture.isCompletedExceptionally(), (Matcher)Matchers.is((Object)true));
        Set<SlotRequestId> canceledSlotRequests = slotProvider.getCanceledSlotRequests();
        Assert.assertThat(canceledSlotRequests, (Matcher)Matchers.equalTo(slotRequests));
    }

    @Test
    public void testAllPreferredLocationCalculation() throws Exception {
        LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
        LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
        LocalTaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
        CompletableFuture<LocalTaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
        CompletableFuture<LocalTaskManagerLocation> locationFuture2 = new CompletableFuture<LocalTaskManagerLocation>();
        CompletableFuture<LocalTaskManagerLocation> locationFuture3 = new CompletableFuture<LocalTaskManagerLocation>();
        Execution execution = ExecutionGraphTestUtils.getExecution(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
        CompletableFuture preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ALL);
        Assert.assertFalse((boolean)preferredLocationsFuture.isDone());
        locationFuture3.complete(taskManagerLocation3);
        Assert.assertFalse((boolean)preferredLocationsFuture.isDone());
        locationFuture2.complete(taskManagerLocation2);
        Assert.assertTrue((boolean)preferredLocationsFuture.isDone());
        Collection preferredLocations = (Collection)preferredLocationsFuture.get();
        Assert.assertThat((Object)preferredLocations, (Matcher)Matchers.containsInAnyOrder((Object[])new TaskManagerLocation[]{taskManagerLocation1, taskManagerLocation2, taskManagerLocation3}));
    }

    @Test
    public void testAnyPreferredLocationCalculation() throws Exception {
        LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
        LocalTaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
        CompletableFuture<LocalTaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
        CompletableFuture locationFuture2 = new CompletableFuture();
        CompletableFuture<LocalTaskManagerLocation> locationFuture3 = CompletableFuture.completedFuture(taskManagerLocation3);
        Execution execution = ExecutionGraphTestUtils.getExecution(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
        CompletableFuture preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ANY);
        Assert.assertTrue((boolean)preferredLocationsFuture.isDone());
        Collection preferredLocations = (Collection)preferredLocationsFuture.get();
        Assert.assertThat((Object)preferredLocations, (Matcher)Matchers.containsInAnyOrder((Object[])new TaskManagerLocation[]{taskManagerLocation1, taskManagerLocation3}));
    }

    @Test
    public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        ProgrammedSlotProvider slotProvider = this.createProgrammedSlotProvider(1, Collections.singleton(jobVertexId), slotOwner);
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider)slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        executionVertex.scheduleForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        CompletableFuture<LogicalSlot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
        CompletableFuture terminationFuture = executionVertex.cancel();
        currentExecutionAttempt.completeCancelling();
        CompletionStage restartFuture = terminationFuture.thenApply(ignored -> {
            Assert.assertTrue((boolean)returnedSlotFuture.isDone());
            return true;
        });
        ((CompletableFuture)restartFuture).get();
    }

    @Test
    public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        ProgrammedSlotProvider slotProvider = this.createProgrammedSlotProvider(1, Collections.singleton(jobVertexId), slotOwner);
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph((SlotProvider)slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
        execution.setInitialState(taskRestoreState);
        Assert.assertThat((Object)execution.getTaskRestore(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        executionVertex.scheduleForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
        Assert.assertThat((Object)execution.getTaskRestore(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void testEagerSchedulingFailureReturnsSlot() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        CompletableFuture slotRequestIdFuture = new CompletableFuture();
        CompletableFuture returnedSlotFuture = new CompletableFuture();
        TestingSlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> {
            slotRequestIdFuture.complete(slotRequestId);
            return new CompletableFuture();
        });
        slotProvider.setSlotCanceller(returnedSlotFuture::complete);
        slotOwner.getReturnedSlotFuture().thenAccept(logicalSlot -> returnedSlotFuture.complete(logicalSlot.getSlotRequestId()));
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        executionGraph.start(this.testMainThreadUtil.getMainThreadExecutor());
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        taskManagerGateway.setCancelConsumer(executionAttemptID -> {
            if (execution.getAttemptId().equals(executionAttemptID)) {
                execution.completeCancelling();
            }
        });
        slotRequestIdFuture.thenAcceptAsync(slotRequestId -> {
            SingleLogicalSlot singleLogicalSlot = ExecutionGraphSchedulingTest.createSingleLogicalSlot(slotOwner, taskManagerGateway, slotRequestId);
            slotProvider.complete((SlotRequestId)slotRequestId, (LogicalSlot)singleLogicalSlot);
        }, (Executor)this.testMainThreadUtil.getMainThreadExecutor());
        CompletableFuture schedulingFuture = (CompletableFuture)this.testMainThreadUtil.execute(() -> execution.scheduleForExecution(executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, Collections.emptySet()));
        try {
            schedulingFuture.get();
            this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((Execution)execution).cancel()));
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        Assert.assertThat(returnedSlotFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo(slotRequestIdFuture.get())));
    }

    @Test
    public void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
        SingleLogicalSlot slot = ExecutionGraphSchedulingTest.createSingleLogicalSlot(slotOwner, new SimpleAckingTaskManagerGateway(), new SlotRequestId());
        CompletableFuture<SingleLogicalSlot> slotFuture = CompletableFuture.completedFuture(slot);
        CountDownLatch slotRequestLatch = new CountDownLatch(1);
        TestingSlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> {
            slotRequestLatch.countDown();
            return slotFuture;
        });
        ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(slotProvider, (RestartStrategy)new NoRestartStrategy(), jobVertex);
        Execution execution = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        executionGraph.start(this.testMainThreadUtil.getMainThreadExecutor());
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((ExecutionGraph)executionGraph).scheduleForExecution()));
        slotRequestLatch.await();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Assert.assertThat((Object)execution.getAssignedResource(), (Matcher)Matchers.is((Matcher)Matchers.sameInstance((Object)slot)));
            slot.release((Throwable)new FlinkException("Test exception"));
            Assert.assertThat((Object)execution.getReleaseFuture().isDone(), (Matcher)Matchers.is((Object)true));
        }));
    }

    @Test
    public void testIncompletePartitionRegistrationFutureIsRejected() throws Exception {
        TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
        JobGraph jobGraph = new JobGraph("job graph");
        JobVertex source = new JobVertex("source");
        JobVertex target = new JobVertex("target");
        source.setInvokableClass(AbstractInvokable.class);
        target.setInvokableClass(AbstractInvokable.class);
        target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobGraph.addVertex(source);
        jobGraph.addVertex(target);
        ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setShuffleMaster(shuffleMaster).build();
        ExecutionVertex sourceVertex = ((ExecutionJobVertex)executionGraph.getAllVertices().get(source.getID())).getTaskVertices()[0];
        boolean incompletePartitionRegistrationRejected = false;
        try {
            Execution.registerProducedPartitions((ExecutionVertex)sourceVertex, (TaskManagerLocation)new LocalTaskManagerLocation(), (ExecutionAttemptID)new ExecutionAttemptID(), (boolean)false);
        }
        catch (IllegalStateException e) {
            incompletePartitionRegistrationRejected = true;
        }
        Assert.assertTrue((boolean)incompletePartitionRegistrationRejected);
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }

    @Nonnull
    private ProgrammedSlotProvider createProgrammedSlotProvider(int parallelism, Collection<JobVertexID> jobVertexIds, SlotOwner slotOwner) {
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
        for (JobVertexID jobVertexId : jobVertexIds) {
            for (int i = 0; i < parallelism; ++i) {
                TestingLogicalSlot slot = this.createTestingLogicalSlot(slotOwner);
                slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
            }
        }
        return slotProvider;
    }

    private static final class SingleSlotTestingSlotOwner
    implements SlotOwner {
        final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture();

        private SingleSlotTestingSlotOwner() {
        }

        public CompletableFuture<LogicalSlot> getReturnedSlotFuture() {
            return this.returnedSlot;
        }

        public void returnLogicalSlot(LogicalSlot logicalSlot) {
            this.returnedSlot.complete(logicalSlot);
        }
    }

    private static class TestingShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return new CompletableFuture<ShuffleDescriptor>();
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        }
    }
}

