/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TaskExecutorSubmissionTest
extends TestLogger {
    @Rule
    public final TestName testName = new TestName();
    private static final Time timeout = Time.milliseconds((long)10000L);
    private JobID jobId = new JobID();

    @Test
    public void testTaskSubmission() throws Exception {
        ExecutionAttemptID eid = new ExecutionAttemptID();
        TaskDeploymentDescriptor tdd = this.createTestTaskDeploymentDescriptor("test task", eid, FutureCompletingInvokable.class);
        CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(1).addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture).build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
            taskRunningFuture.get();
        }
    }

    @Test
    public void testSubmitTaskFailure() throws Exception {
        ExecutionAttemptID eid = new ExecutionAttemptID();
        TaskDeploymentDescriptor tdd = this.createTestTaskDeploymentDescriptor("test task", eid, BlockingNoOpInvokable.class, 0);
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
        }
        catch (Exception e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)Matchers.instanceOf(IllegalArgumentException.class));
        }
    }

    @Test
    public void testTaskSubmissionAndCancelling() throws Exception {
        ExecutionAttemptID eid1 = new ExecutionAttemptID();
        ExecutionAttemptID eid2 = new ExecutionAttemptID();
        TaskDeploymentDescriptor tdd1 = this.createTestTaskDeploymentDescriptor("test task", eid1, BlockingNoOpInvokable.class);
        TaskDeploymentDescriptor tdd2 = this.createTestTaskDeploymentDescriptor("test task", eid2, BlockingNoOpInvokable.class);
        CompletableFuture<Void> task1RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task1CanceledFuture = new CompletableFuture<Void>();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(2).addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture).addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture).addTaskManagerActionListener(eid1, ExecutionState.CANCELED, task1CanceledFuture).build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd1.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
            task1RunningFuture.get();
            taskSlotTable.allocateSlot(1, this.jobId, tdd2.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd2, env.getJobMasterId(), timeout).get();
            task2RunningFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid1)).getExecutionState(), (Object)ExecutionState.RUNNING);
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid2)).getExecutionState(), (Object)ExecutionState.RUNNING);
            tmGateway.cancelTask(eid1, timeout);
            task1CanceledFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid1)).getExecutionState(), (Object)ExecutionState.CANCELED);
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid2)).getExecutionState(), (Object)ExecutionState.RUNNING);
        }
    }

    @Test
    public void testGateChannelEdgeMismatch() throws Exception {
        ExecutionAttemptID eid1 = new ExecutionAttemptID();
        ExecutionAttemptID eid2 = new ExecutionAttemptID();
        TaskDeploymentDescriptor tdd1 = this.createTestTaskDeploymentDescriptor("Sender", eid1, TestingAbstractInvokables.Sender.class);
        TaskDeploymentDescriptor tdd2 = this.createTestTaskDeploymentDescriptor("Receiver", eid2, TestingAbstractInvokables.Receiver.class);
        CompletableFuture<Void> task1RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task1FailedFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2FailedFuture = new CompletableFuture<Void>();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture).addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture).addTaskManagerActionListener(eid1, ExecutionState.FAILED, task1FailedFuture).addTaskManagerActionListener(eid2, ExecutionState.FAILED, task2FailedFuture).setSlotSize(2).build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd1.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get();
            task1RunningFuture.get();
            taskSlotTable.allocateSlot(1, this.jobId, tdd2.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd2, env.getJobMasterId(), timeout).get();
            task2RunningFuture.get();
            task1FailedFuture.get();
            task2FailedFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid1)).getExecutionState(), (Object)ExecutionState.FAILED);
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid2)).getExecutionState(), (Object)ExecutionState.FAILED);
        }
    }

    @Test
    public void testRunJobWithForwardChannel() throws Exception {
        ResourceID producerLocation = ResourceID.generate();
        NettyShuffleDescriptor sdd = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
        TaskDeploymentDescriptor tdd1 = this.createSender(sdd);
        TaskDeploymentDescriptor tdd2 = this.createReceiver(sdd);
        ExecutionAttemptID eid1 = tdd1.getExecutionAttemptId();
        ExecutionAttemptID eid2 = tdd2.getExecutionAttemptId();
        CompletableFuture<Void> task1RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task1FinishedFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2FinishedFuture = new CompletableFuture<Void>();
        JobMasterId jobMasterId = JobMasterId.generate();
        TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> jobMasterId).setNotifyPartitionDataAvailableFunction(resultPartitionID -> CompletableFuture.completedFuture(Acknowledge.get())).build();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setResourceID(producerLocation).setSlotSize(2).addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture).addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture).addTaskManagerActionListener(eid1, ExecutionState.FINISHED, task1FinishedFuture).addTaskManagerActionListener(eid2, ExecutionState.FINISHED, task2FinishedFuture).setJobMasterId(jobMasterId).setJobMasterGateway(testingJobMasterGateway).useRealNonMockShuffleEnvironment().build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd1.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
            task1RunningFuture.get();
            taskSlotTable.allocateSlot(1, this.jobId, tdd2.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd2, jobMasterId, timeout).get();
            task2RunningFuture.get();
            task1FinishedFuture.get();
            task2FinishedFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid1)).getExecutionState(), (Object)ExecutionState.FINISHED);
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid2)).getExecutionState(), (Object)ExecutionState.FINISHED);
        }
    }

    @Test
    public void testCancellingDependentAndStateUpdateFails() throws Exception {
        ResourceID producerLocation = ResourceID.generate();
        NettyShuffleDescriptor sdd = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
        TaskDeploymentDescriptor tdd1 = this.createSender(sdd);
        TaskDeploymentDescriptor tdd2 = this.createReceiver(sdd);
        ExecutionAttemptID eid1 = tdd1.getExecutionAttemptId();
        ExecutionAttemptID eid2 = tdd2.getExecutionAttemptId();
        CompletableFuture<Void> task1RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2RunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task1FailedFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> task2CanceledFuture = new CompletableFuture<Void>();
        JobMasterId jobMasterId = JobMasterId.generate();
        TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> jobMasterId).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState != null && taskExecutionState.getID().equals((Object)eid1) && taskExecutionState.getExecutionState() == ExecutionState.RUNNING) {
                return FutureUtils.completedExceptionally((Throwable)new ExecutionGraphException("The execution attempt " + eid2 + " was not found."));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setResourceID(producerLocation).setSlotSize(2).addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture).addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture).addTaskManagerActionListener(eid1, ExecutionState.FAILED, task1FailedFuture).addTaskManagerActionListener(eid2, ExecutionState.CANCELED, task2CanceledFuture).setJobMasterId(jobMasterId).setJobMasterGateway(testingJobMasterGateway).useRealNonMockShuffleEnvironment().build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd1.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd1, jobMasterId, timeout).get();
            task1RunningFuture.get();
            taskSlotTable.allocateSlot(1, this.jobId, tdd2.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd2, jobMasterId, timeout).get();
            task2RunningFuture.get();
            task1FailedFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid1)).getExecutionState(), (Object)ExecutionState.FAILED);
            tmGateway.cancelTask(eid2, timeout);
            task2CanceledFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid2)).getExecutionState(), (Object)ExecutionState.CANCELED);
        }
    }

    @Test
    public void testRemotePartitionNotFound() throws Exception {
        try (NetUtils.Port port = NetUtils.getAvailablePort();){
            int dataPort = port.getPort();
            Configuration config = new Configuration();
            config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
            config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
            config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
            NettyShuffleDescriptor sdd = NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
            TaskDeploymentDescriptor tdd = this.createReceiver(sdd);
            ExecutionAttemptID eid = tdd.getExecutionAttemptId();
            CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
            CompletableFuture<Void> taskFailedFuture = new CompletableFuture<Void>();
            try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(2).addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture).addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture).setConfiguration(config).setLocalCommunication(false).useRealNonMockShuffleEnvironment().build();){
                TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
                TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, tdd.getAllocationId(), Time.seconds((long)60L));
                tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
                taskRunningFuture.get();
                taskFailedFuture.get();
                Assert.assertThat((Object)((Task)taskSlotTable.getTask(eid)).getFailureCause(), (Matcher)Matchers.instanceOf(PartitionNotFoundException.class));
            }
        }
    }

    @Test
    public void testUpdateTaskInputPartitionsFailure() throws Exception {
        ExecutionAttemptID eid = new ExecutionAttemptID();
        TaskDeploymentDescriptor tdd = this.createTestTaskDeploymentDescriptor("test task", eid, BlockingNoOpInvokable.class);
        CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> taskFailedFuture = new CompletableFuture<Void>();
        ShuffleEnvironment shuffleEnvironment = (ShuffleEnvironment)Mockito.mock(ShuffleEnvironment.class, (Answer)Mockito.RETURNS_MOCKS);
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setShuffleEnvironment(shuffleEnvironment).setSlotSize(1).addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture).addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture).build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
            taskRunningFuture.get();
            ResourceID producerLocation = env.getTaskExecutor().getResourceID();
            NettyShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
            PartitionInfo partitionUpdate = new PartitionInfo(new IntermediateDataSetID(), (ShuffleDescriptor)shuffleDescriptor);
            ((ShuffleEnvironment)Mockito.doThrow((Throwable[])new Throwable[]{new IOException()}).when((Object)shuffleEnvironment)).updatePartitionInfo(eid, partitionUpdate);
            CompletableFuture updateFuture = tmGateway.updatePartitions(eid, Collections.singletonList(partitionUpdate), timeout);
            updateFuture.get();
            taskFailedFuture.get();
            Task task = (Task)taskSlotTable.getTask(tdd.getExecutionAttemptId());
            Assert.assertThat((Object)task.getExecutionState(), (Matcher)CoreMatchers.is((Object)ExecutionState.FAILED));
            Assert.assertThat((Object)task.getFailureCause(), (Matcher)Matchers.instanceOf(IOException.class));
        }
    }

    @Test
    public void testLocalPartitionNotFound() throws Exception {
        ResourceID producerLocation = ResourceID.generate();
        NettyShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
        TaskDeploymentDescriptor tdd = this.createReceiver(shuffleDescriptor);
        ExecutionAttemptID eid = tdd.getExecutionAttemptId();
        Configuration config = new Configuration();
        config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> taskFailedFuture = new CompletableFuture<Void>();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setResourceID(producerLocation).setSlotSize(1).addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture).addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture).setConfiguration(config).useRealNonMockShuffleEnvironment().build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            taskSlotTable.allocateSlot(0, this.jobId, tdd.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
            taskRunningFuture.get();
            taskFailedFuture.get();
            Assert.assertSame((Object)((Task)taskSlotTable.getTask(eid)).getExecutionState(), (Object)ExecutionState.FAILED);
            Assert.assertThat((Object)((Task)taskSlotTable.getTask(eid)).getFailureCause(), (Matcher)Matchers.instanceOf(PartitionNotFoundException.class));
        }
    }

    @Test
    public void testFailingNotifyPartitionDataAvailable() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4096"));
        NettyShuffleDescriptor sdd = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
        TaskDeploymentDescriptor tdd = this.createSender(sdd, TestingAbstractInvokables.TestInvokableRecordCancel.class);
        ExecutionAttemptID eid = tdd.getExecutionAttemptId();
        CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
        Exception exception = new Exception("Failed notifyPartitionDataAvailable");
        JobMasterId jobMasterId = JobMasterId.generate();
        TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> jobMasterId).setNotifyPartitionDataAvailableFunction(resultPartitionID -> FutureUtils.completedExceptionally((Throwable)exception)).build();
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(1).setConfiguration(configuration).addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture).setJobMasterId(jobMasterId).setJobMasterGateway(testingJobMasterGateway).useRealNonMockShuffleEnvironment().build();){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
            TestingAbstractInvokables.TestInvokableRecordCancel.resetGotCanceledFuture();
            taskSlotTable.allocateSlot(0, this.jobId, tdd.getAllocationId(), Time.seconds((long)60L));
            tmGateway.submitTask(tdd, jobMasterId, timeout).get();
            taskRunningFuture.get();
            CompletableFuture<Boolean> cancelFuture = TestingAbstractInvokables.TestInvokableRecordCancel.gotCanceled();
            Assert.assertTrue((boolean)cancelFuture.get());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)((Task)taskSlotTable.getTask(eid)).getFailureCause(), (String)exception.getMessage()).isPresent());
        }
    }

    private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor shuffleDescriptor) throws IOException {
        return this.createSender(shuffleDescriptor, TestingAbstractInvokables.Sender.class);
    }

    private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor shuffleDescriptor, Class<? extends AbstractInvokable> abstractInvokable) throws IOException {
        PartitionDescriptor partitionDescriptor = PartitionDescriptorBuilder.newBuilder().setPartitionId(shuffleDescriptor.getResultPartitionID().getPartitionId()).build();
        ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(partitionDescriptor, (ShuffleDescriptor)shuffleDescriptor, 1, true);
        return this.createTestTaskDeploymentDescriptor("Sender", shuffleDescriptor.getResultPartitionID().getProducerId(), abstractInvokable, 1, Collections.singletonList(resultPartitionDeploymentDescriptor), Collections.emptyList());
    }

    private TaskDeploymentDescriptor createReceiver(NettyShuffleDescriptor shuffleDescriptor) throws IOException {
        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new ShuffleDescriptor[]{shuffleDescriptor});
        return this.createTestTaskDeploymentDescriptor("Receiver", new ExecutionAttemptID(), TestingAbstractInvokables.Receiver.class, 1, Collections.emptyList(), Collections.singletonList(inputGateDeploymentDescriptor));
    }

    private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(String taskName, ExecutionAttemptID eid, Class<? extends AbstractInvokable> abstractInvokable) throws IOException {
        return this.createTestTaskDeploymentDescriptor(taskName, eid, abstractInvokable, 1);
    }

    private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(String taskName, ExecutionAttemptID eid, Class<? extends AbstractInvokable> abstractInvokable, int maxNumberOfSubtasks) throws IOException {
        return this.createTestTaskDeploymentDescriptor(taskName, eid, abstractInvokable, maxNumberOfSubtasks, Collections.emptyList(), Collections.emptyList());
    }

    private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(String taskName, ExecutionAttemptID eid, Class<? extends AbstractInvokable> abstractInvokable, int maxNumberOfSubtasks, List<ResultPartitionDeploymentDescriptor> producedPartitions, List<InputGateDeploymentDescriptor> inputGates) throws IOException {
        Preconditions.checkNotNull(producedPartitions);
        Preconditions.checkNotNull(inputGates);
        return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(this.jobId, this.testName.getMethodName(), eid, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), taskName, maxNumberOfSubtasks, 0, 1, 0, new Configuration(), new Configuration(), abstractInvokable.getName(), producedPartitions, inputGates, Collections.emptyList(), Collections.emptyList());
    }

    static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId, String jobName, ExecutionAttemptID executionAttemptId, SerializedValue<ExecutionConfig> serializedExecutionConfig, String taskName, int maxNumberOfSubtasks, int subtaskIndex, int numberOfSubtasks, int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName, List<ResultPartitionDeploymentDescriptor> producedPartitions, List<InputGateDeploymentDescriptor> inputGates, Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException {
        JobInformation jobInformation = new JobInformation(jobId, jobName, serializedExecutionConfig, jobConfiguration, requiredJarFiles, requiredClasspaths);
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), taskName, numberOfSubtasks, maxNumberOfSubtasks, invokableClassName, taskConfiguration);
        SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
        SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
        return new TaskDeploymentDescriptor(jobId, (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobInformation), (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobVertexInformation), executionAttemptId, new AllocationID(), subtaskIndex, attemptNumber, null, producedPartitions, inputGates);
    }

    public static class FutureCompletingInvokable
    extends AbstractInvokable {
        static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture();

        public FutureCompletingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            COMPLETABLE_FUTURE.complete(true);
        }
    }
}

