/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class ExecutionVertexDeploymentTest
extends TestLogger {
    private static final String ERROR_MESSAGE = "test_failure_error_message";

    @Test
    public void testDeployCall() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot((LogicalSlot)slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployWithSynchronousAnswer() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot((LogicalSlot)slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.RUNNING) == 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployWithAsynchronousAnswer() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot((LogicalSlot)slot);
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.RUNNING) == 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployFailedSynchronous() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot((LogicalSlot)slot);
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertNotNull((Object)vertex.getFailureCause());
            Assert.assertTrue((boolean)vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployFailedAsynchronously() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot((LogicalSlot)slot);
            for (int i = 0; i < 100 && (vertex.getExecutionState() != ExecutionState.FAILED || vertex.getFailureCause() == null); ++i) {
                Thread.sleep(10L);
            }
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertNotNull((Object)vertex.getFailureCause());
            Assert.assertTrue((boolean)vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringDeploy() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitBlockingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot((LogicalSlot)testingLogicalSlot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            Exception testError = new Exception("test error");
            vertex.fail((Throwable)testError);
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertEquals((Object)testError, (Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTddProducedPartitionsLazyScheduling() throws Exception {
        for (ScheduleMode scheduleMode : ScheduleMode.values()) {
            ExecutionJobVertex jobVertex = ExecutionGraphTestUtils.getExecutionJobVertex(new JobVertexID(), new DirectScheduledExecutorService(), scheduleMode);
            IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 1, ResultPartitionType.PIPELINED);
            ExecutionAttemptID attemptID = new ExecutionAttemptID();
            ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes((long)1L));
            TaskDeploymentDescriptorFactory tddFactory = TaskDeploymentDescriptorFactory.fromExecutionVertex((ExecutionVertex)vertex, (int)1);
            ExecutionEdge mockEdge = this.createMockExecutionEdge(1);
            result.getPartitions()[0].addConsumerGroup();
            result.getPartitions()[0].addConsumer(mockEdge, 0);
            TaskManagerLocation location = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 1);
            TaskDeploymentDescriptor tdd = tddFactory.createDeploymentDescriptor(new AllocationID(), 0, null, ((Map)Execution.registerProducedPartitions((ExecutionVertex)vertex, (TaskManagerLocation)location, (ExecutionAttemptID)attemptID, (boolean)scheduleMode.allowLazyDeployment()).get()).values());
            List producedPartitions = tdd.getProducedPartitions();
            Assert.assertEquals((long)1L, (long)producedPartitions.size());
            ResultPartitionDeploymentDescriptor desc = (ResultPartitionDeploymentDescriptor)producedPartitions.iterator().next();
            Assert.assertEquals((Object)scheduleMode.allowLazyDeployment(), (Object)desc.sendScheduleOrUpdateConsumersMessage());
        }
    }

    private ExecutionEdge createMockExecutionEdge(int maxParallelism) {
        ExecutionVertex targetVertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        ExecutionJobVertex targetJobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)targetVertex.getJobVertex()).thenReturn((Object)targetJobVertex);
        Mockito.when((Object)targetJobVertex.getMaxParallelism()).thenReturn((Object)maxParallelism);
        ExecutionEdge edge = (ExecutionEdge)Mockito.mock(ExecutionEdge.class);
        Mockito.when((Object)edge.getTarget()).thenReturn((Object)targetVertex);
        return edge;
    }

    private static class SubmitBlockingSimpleAckingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        private SubmitBlockingSimpleAckingTaskManagerGateway() {
        }

        @Override
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
            return new CompletableFuture<Acknowledge>();
        }
    }

    private static class SubmitFailingSimpleAckingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        private SubmitFailingSimpleAckingTaskManagerGateway() {
        }

        @Override
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
            CompletableFuture<Acknowledge> future = new CompletableFuture<Acknowledge>();
            future.completeExceptionally(new Exception(ExecutionVertexDeploymentTest.ERROR_MESSAGE));
            return future;
        }
    }
}

