/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExecutionGraphNotEnoughResourceTest
extends TestLogger {
    private static TestingComponentMainThreadExecutor.Resource mainThreadExecutorResource;
    private static ComponentMainThreadExecutor mainThreadExecutor;
    private static final JobID TEST_JOB_ID;
    private static final int NUM_TASKS = 31;

    @BeforeClass
    public static void setupClass() {
        mainThreadExecutorResource = new TestingComponentMainThreadExecutor.Resource();
        mainThreadExecutorResource.before();
        mainThreadExecutor = mainThreadExecutorResource.getComponentMainThreadTestExecutor().getMainThreadExecutor();
    }

    @AfterClass
    public static void teardownClass() {
        mainThreadExecutorResource.after();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
        int numRestarts = 10;
        int parallelism = 20;
        TestingSlotPoolImpl slotPool = null;
        try {
            slotPool = new TestingSlotPoolImpl(TEST_JOB_ID);
            Scheduler scheduler = ExecutionGraphNotEnoughResourceTest.createSchedulerWithSlots(19, (SlotPool)slotPool, new LocalTaskManagerLocation());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            JobVertex source = new JobVertex("source");
            source.setInvokableClass(NoOpInvokable.class);
            source.setParallelism(20);
            source.setSlotSharingGroup(sharingGroup);
            JobVertex sink = new JobVertex("sink");
            sink.setInvokableClass(NoOpInvokable.class);
            sink.setParallelism(20);
            sink.setSlotSharingGroup(sharingGroup);
            sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
            JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job", new JobVertex[]{source, sink});
            jobGraph.setScheduleMode(ScheduleMode.EAGER);
            TestRestartStrategy restartStrategy = new TestRestartStrategy(10, false);
            ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setSlotProvider((SlotProvider)scheduler).setRestartStrategy(restartStrategy).setAllocationTimeout(Time.milliseconds((long)1L)).build();
            eg.start(mainThreadExecutor);
            mainThreadExecutor.execute(ThrowingRunnable.unchecked(() -> ((ExecutionGraph)eg).scheduleForExecution()));
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> CompletableFuture.supplyAsync(() -> ((ExecutionGraph)eg).getState(), (Executor)mainThreadExecutor).join() == JobStatus.FAILED), Deadline.fromNow((Duration)Duration.ofSeconds(10L)));
            Assert.assertEquals((long)11L, (long)CompletableFuture.supplyAsync(() -> ((ExecutionGraph)eg).getNumberOfRestarts(), (Executor)mainThreadExecutor).join());
            Throwable t = CompletableFuture.supplyAsync(() -> ((ExecutionGraph)eg).getFailureCause(), (Executor)mainThreadExecutor).join();
            if (!(t instanceof NoResourceAvailableException)) {
                ExceptionUtils.rethrowException((Throwable)t, (String)t.getMessage());
            }
        }
        finally {
            if (slotPool != null) {
                CompletableFuture.runAsync(() -> ((SlotPool)slotPool).close(), (Executor)mainThreadExecutor).join();
            }
        }
    }

    private static Scheduler createSchedulerWithSlots(int numSlots, SlotPool slotPool, TaskManagerLocation taskManagerLocation) throws Exception {
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        String jobManagerAddress = "foobar";
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        slotPool.start(JobMasterId.generate(), "foobar", mainThreadExecutor);
        slotPool.connectToResourceManager((ResourceManagerGateway)resourceManagerGateway);
        SchedulerImpl scheduler = new SchedulerImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
        scheduler.start(mainThreadExecutor);
        CompletableFuture.runAsync(() -> slotPool.registerTaskManager(taskManagerLocation.getResourceID()), (Executor)mainThreadExecutor).join();
        ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(31);
        for (int i = 0; i < numSlots; ++i) {
            AllocationID allocationId = new AllocationID();
            SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
            slotOffers.add(slotOffer);
        }
        CompletableFuture.runAsync(() -> slotPool.offerSlots(taskManagerLocation, taskManagerGateway, (Collection)slotOffers), (Executor)mainThreadExecutor).join();
        return scheduler;
    }

    static {
        TEST_JOB_ID = new JobID();
    }
}

