/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.minicluster.SometimesExceptionSender;
import org.apache.flink.runtime.minicluster.SometimesInstantiationErrorSender;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class MiniClusterITCase
extends TestLogger {
    @Test
    public void runJobWithSingleRpcService() throws Exception {
        int numOfTMs = 3;
        int slotsPerTM = 7;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.SHARED).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(MiniClusterITCase.getSimpleJob(21));
        }
    }

    @Test
    public void runJobWithMultipleRpcServices() throws Exception {
        int numOfTMs = 3;
        int slotsPerTM = 7;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.DEDICATED).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(MiniClusterITCase.getSimpleJob(21));
        }
    }

    @Test
    public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception {
        try {
            this.setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
            Assert.fail((String)"Job should fail.");
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Job execution failed.").isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, NoResourceAvailableException.class).isPresent());
            String legacySchedulerErrorMessage = "Slots required: 2, slots allocated: 1";
            String ngSchedulerErrorMessage = "Could not allocate the required slot within slot request timeout";
            Assert.assertTrue((ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Slots required: 2, slots allocated: 1").isPresent() || ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Could not allocate the required slot within slot request timeout").isPresent() ? 1 : 0) != 0);
        }
    }

    @Test
    public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception {
        try {
            this.setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES);
            Assert.fail((String)"Job should fail.");
        }
        catch (JobExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Job execution failed.").isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, NoResourceAvailableException.class).isPresent());
            String legacySchedulerErrorMessage = "Could not allocate enough slots";
            String ngSchedulerErrorMessage = "Could not allocate the required slot within slot request timeout";
            Assert.assertTrue((ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Could not allocate enough slots").isPresent() || ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Could not allocate the required slot within slot request timeout").isPresent() ? 1 : 0) != 0);
        }
    }

    private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode scheduleMode) throws Exception {
        JobVertex vertex = new JobVertex("Test Vertex");
        vertex.setParallelism(2);
        vertex.setMaxParallelism(2);
        vertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setScheduleMode(scheduleMode);
        this.runHandleJobsWhenNotEnoughSlots(jobGraph);
    }

    private void runHandleJobsWhenNotEnoughSlots(JobGraph jobGraph) throws Exception {
        Configuration configuration = this.getDefaultConfiguration();
        configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testForwardJob() throws Exception {
        int parallelism = 31;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(31).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(31);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(31);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Pointwise Job", new JobVertex[]{sender, receiver});
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testBipartiteJob() throws Exception {
        int parallelism = 31;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(31).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(31);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticReceiver.class);
            receiver.setParallelism(31);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Bipartite Job", new JobVertex[]{sender, receiver});
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testTwoInputJobFailingEdgeMismatch() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(6).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender1 = new JobVertex("Sender1");
            sender1.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender1.setParallelism(1);
            JobVertex sender2 = new JobVertex("Sender2");
            sender2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender2.setParallelism(2);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticTertiaryReceiver.class);
            receiver.setParallelism(3);
            receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Bipartite Job", new JobVertex[]{sender1, receiver, sender2});
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, ArrayIndexOutOfBoundsException.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"2").isPresent());
            }
        }
    }

    @Test
    public void testTwoInputJob() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(66).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender1 = new JobVertex("Sender1");
            sender1.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender1.setParallelism(11);
            JobVertex sender2 = new JobVertex("Sender2");
            sender2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender2.setParallelism(22);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticBinaryReceiver.class);
            receiver.setParallelism(33);
            receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Bipartite Job", new JobVertex[]{sender1, receiver, sender2});
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testSchedulingAllAtOnce() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(11);
            JobVertex forwarder = new JobVertex("Forwarder");
            forwarder.setInvokableClass(Tasks.Forwarder.class);
            forwarder.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticReceiver.class);
            receiver.setParallelism(11);
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            sender.setSlotSharingGroup(sharingGroup);
            forwarder.setSlotSharingGroup(sharingGroup);
            receiver.setSlotSharingGroup(sharingGroup);
            forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Forwarding Job", new JobVertex[]{sender, forwarder, receiver});
            jobGraph.setScheduleMode(ScheduleMode.EAGER);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testJobWithAFailingSenderVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(Tasks.ExceptionSender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Pointwise Job", new JobVertex[]{sender, receiver});
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent());
            }
        }
    }

    @Test
    public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(SometimesExceptionSender.class);
            sender.setParallelism(11);
            SometimesExceptionSender.configFailingSenders(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Pointwise Job", new JobVertex[]{sender, receiver});
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent());
            }
        }
    }

    @Test
    public void testJobWithAFailingReceiverVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.ExceptionReceiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Pointwise Job", new JobVertex[]{sender, receiver});
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent());
            }
        }
    }

    @Test
    public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(Tasks.InstantiationErrorSender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Pointwise Job", new JobVertex[]{sender, receiver});
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception in constructor").isPresent());
            }
        }
    }

    @Test
    public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(SometimesInstantiationErrorSender.class);
            sender.setParallelism(11);
            SometimesInstantiationErrorSender.configFailingSenders(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("Pointwise Job", new JobVertex[]{sender, receiver});
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception in constructor").isPresent());
            }
        }
    }

    @Test
    public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex source = new JobVertex("Source");
            source.setInvokableClass(WaitingNoOpInvokable.class);
            source.setParallelism(11);
            WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 20L);
            sink.setInvokableClass(NoOpInvokable.class);
            sink.setParallelism(11);
            sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", new JobVertex[]{source, sink});
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            ((JobResult)((CompletableFuture)jobResultFuture).get()).toJobExecutionResult(((Object)((Object)this)).getClass().getClassLoader());
            Assert.assertTrue((boolean)sink.finalizedOnMaster.get());
        }
    }

    private Configuration getDefaultConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "0");
        return configuration;
    }

    private static JobGraph getSimpleJob(int parallelism) throws IOException {
        JobVertex task = new JobVertex("Test task");
        task.setParallelism(parallelism);
        task.setMaxParallelism(parallelism);
        task.setInvokableClass(NoOpInvokable.class);
        JobGraph jg = new JobGraph(new JobID(), "Test Job", new JobVertex[]{task});
        jg.setScheduleMode(ScheduleMode.EAGER);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)1000L));
        jg.setExecutionConfig(executionConfig);
        return jg;
    }

    private static class WaitOnFinalizeJobVertex
    extends JobVertex {
        private static final long serialVersionUID = -1179547322468530299L;
        private final AtomicBoolean finalizedOnMaster = new AtomicBoolean(false);
        private final long waitingTime;

        WaitOnFinalizeJobVertex(String name, long waitingTime) {
            super(name);
            this.waitingTime = waitingTime;
        }

        public void finalizeOnMaster(ClassLoader loader) throws Exception {
            Thread.sleep(this.waitingTime);
            this.finalizedOnMaster.set(true);
        }
    }
}

