/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionVertexSchedulingRequirements;
import org.apache.flink.runtime.scheduler.InputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.SlotExecutionVertexAssignment;
import org.apache.flink.runtime.scheduler.TestingInputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultExecutionSlotAllocatorTest
extends TestLogger {
    private AllocationToggableSlotProvider slotProvider;

    @Before
    public void setUp() throws Exception {
        this.slotProvider = new AllocationToggableSlotProvider();
    }

    @Test
    public void testConsumersAssignedToSlotsAfterProducers() {
        ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
        ExecutionVertexID consumerId = new ExecutionVertexID(new JobVertexID(), 0);
        TestingInputsLocationsRetriever inputsLocationsRetriever = new TestingInputsLocationsRetriever.Builder().connectConsumerToProducer(consumerId, producerId).build();
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator(inputsLocationsRetriever);
        inputsLocationsRetriever.markScheduled(producerId);
        inputsLocationsRetriever.markScheduled(consumerId);
        List<ExecutionVertexSchedulingRequirements> schedulingRequirements = this.createSchedulingRequirements(producerId, consumerId);
        List slotExecutionVertexAssignments = executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
        Assert.assertThat((Object)slotExecutionVertexAssignments, (Matcher)Matchers.hasSize((int)2));
        SlotExecutionVertexAssignment producerSlotAssignment = this.findSlotAssignmentByExecutionVertexId(producerId, slotExecutionVertexAssignments);
        SlotExecutionVertexAssignment consumerSlotAssignment = this.findSlotAssignmentByExecutionVertexId(consumerId, slotExecutionVertexAssignments);
        Assert.assertTrue((boolean)producerSlotAssignment.getLogicalSlotFuture().isDone());
        Assert.assertFalse((boolean)consumerSlotAssignment.getLogicalSlotFuture().isDone());
        inputsLocationsRetriever.assignTaskManagerLocation(producerId);
        Assert.assertTrue((boolean)consumerSlotAssignment.getLogicalSlotFuture().isDone());
        Assert.assertEquals((long)0L, (long)executionSlotAllocator.getNumberOfPendingSlotAssignments());
    }

    @Test
    public void testAllocateSlotsParameters() {
        ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
        AllocationID allocationId = new AllocationID();
        SlotSharingGroupId sharingGroupId = new SlotSharingGroupId();
        ResourceProfile taskResourceProfile = ResourceProfile.fromResources((double)0.5, (int)250);
        ResourceProfile physicalSlotResourceProfile = ResourceProfile.fromResources((double)1.0, (int)300);
        CoLocationConstraint coLocationConstraint = new CoLocationGroup().getLocationConstraint(0);
        Set<LocalTaskManagerLocation> taskManagerLocations = Collections.singleton(new LocalTaskManagerLocation());
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator();
        List<ExecutionVertexSchedulingRequirements> schedulingRequirements = Arrays.asList(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(executionVertexId).withPreviousAllocationId(allocationId).withSlotSharingGroupId(sharingGroupId).withPreferredLocations(taskManagerLocations).withPhysicalSlotResourceProfile(physicalSlotResourceProfile).withTaskResourceProfile(taskResourceProfile).withCoLocationConstraint(coLocationConstraint).build());
        executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
        Assert.assertThat(this.slotProvider.getSlotAllocationRequests(), (Matcher)Matchers.hasSize((int)1));
        ScheduledUnit expectedTask = (ScheduledUnit)this.slotProvider.getSlotAllocationRequests().get((int)0).f1;
        SlotProfile expectedSlotProfile = (SlotProfile)this.slotProvider.getSlotAllocationRequests().get((int)0).f2;
        Assert.assertEquals((Object)sharingGroupId, (Object)expectedTask.getSlotSharingGroupId());
        Assert.assertEquals((Object)coLocationConstraint, (Object)expectedTask.getCoLocationConstraint());
        Assert.assertThat((Object)expectedSlotProfile.getPreferredAllocations(), (Matcher)Matchers.contains((Object[])new AllocationID[]{allocationId}));
        Assert.assertThat((Object)expectedSlotProfile.getPreviousExecutionGraphAllocations(), (Matcher)Matchers.contains((Object[])new AllocationID[]{allocationId}));
        Assert.assertEquals((Object)taskResourceProfile, (Object)expectedSlotProfile.getTaskResourceProfile());
        Assert.assertEquals((Object)physicalSlotResourceProfile, (Object)expectedSlotProfile.getPhysicalSlotResourceProfile());
        Assert.assertThat((Object)expectedSlotProfile.getPreferredLocations(), (Matcher)Matchers.contains((Object[])taskManagerLocations.toArray()));
    }

    @Test
    public void testCancelNonExistingExecutionVertex() {
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator();
        ExecutionVertexID inValidExecutionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
        executionSlotAllocator.cancel(inValidExecutionVertexId);
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testCancelFulfilledSlotRequest() {
        ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator();
        List<ExecutionVertexSchedulingRequirements> schedulingRequirements = this.createSchedulingRequirements(producerId);
        executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
        executionSlotAllocator.cancel(producerId);
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testCancelUnFulfilledSlotRequest() throws Exception {
        ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator();
        this.slotProvider.disableSlotAllocation();
        List<ExecutionVertexSchedulingRequirements> schedulingRequirements = this.createSchedulingRequirements(producerId);
        List assignments = executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
        executionSlotAllocator.cancel(producerId);
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), (Matcher)Matchers.hasSize((int)1));
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), (Matcher)Matchers.contains((Object[])this.slotProvider.getReceivedSlotRequestIds().toArray()));
        try {
            ((SlotExecutionVertexAssignment)assignments.iterator().next()).getLogicalSlotFuture().get();
            Assert.fail((String)"Expect a CancellationException but got nothing.");
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    @Test
    public void testStop() throws Exception {
        ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator();
        this.slotProvider.disableSlotAllocation();
        List<ExecutionVertexSchedulingRequirements> schedulingRequirements = this.createSchedulingRequirements(executionVertexId);
        executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
        executionSlotAllocator.stop().get();
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), (Matcher)Matchers.hasSize((int)1));
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), (Matcher)Matchers.contains((Object[])this.slotProvider.getReceivedSlotRequestIds().toArray()));
        Assert.assertEquals((long)0L, (long)executionSlotAllocator.getNumberOfPendingSlotAssignments());
    }

    @Test
    public void testComputeAllPriorAllocationIds() {
        List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
        List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).withPreviousAllocationId(expectAllocationIds.get(0)).build(), new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).withPreviousAllocationId(expectAllocationIds.get(0)).build(), new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).withPreviousAllocationId(expectAllocationIds.get(1)).build(), new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).build());
        Set allPriorAllocationIds = DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
        Assert.assertThat((Object)allPriorAllocationIds, (Matcher)Matchers.containsInAnyOrder((Object[])expectAllocationIds.toArray()));
    }

    @Test
    public void testDuplicatedSlotAllocationIsNotAllowed() {
        ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator executionSlotAllocator = this.createExecutionSlotAllocator();
        this.slotProvider.disableSlotAllocation();
        List<ExecutionVertexSchedulingRequirements> schedulingRequirements = this.createSchedulingRequirements(executionVertexId);
        executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
        try {
            executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
            Assert.fail((String)"exception should happen");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
        return this.createExecutionSlotAllocator(new TestingInputsLocationsRetriever.Builder().build());
    }

    private DefaultExecutionSlotAllocator createExecutionSlotAllocator(InputsLocationsRetriever inputsLocationsRetriever) {
        return new DefaultExecutionSlotAllocator(SlotProviderStrategy.from((ScheduleMode)ScheduleMode.EAGER, (SlotProvider)this.slotProvider, (Time)Time.seconds((long)10L)), inputsLocationsRetriever);
    }

    private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(ExecutionVertexID ... executionVertexIds) {
        ArrayList<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<ExecutionVertexSchedulingRequirements>(executionVertexIds.length);
        for (ExecutionVertexID executionVertexId : executionVertexIds) {
            schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(executionVertexId).build());
        }
        return schedulingRequirements;
    }

    private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(ExecutionVertexID executionVertexId, Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
        return slotExecutionVertexAssignments.stream().filter(slotExecutionVertexAssignment -> slotExecutionVertexAssignment.getExecutionVertexId().equals((Object)executionVertexId)).findFirst().orElseThrow(() -> new IllegalArgumentException(String.format("SlotExecutionVertexAssignment with execution vertex id %s not found", executionVertexId)));
    }

    private static class AllocationToggableSlotProvider
    implements SlotProvider {
        private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests = new ArrayList<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>>();
        private final List<SlotRequestId> cancelledSlotRequestIds = new ArrayList<SlotRequestId>();
        private boolean slotAllocationDisabled;

        private AllocationToggableSlotProvider() {
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, Time timeout) {
            this.slotAllocationRequests.add((Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>)Tuple3.of((Object)slotRequestId, (Object)task, (Object)slotProfile));
            if (this.slotAllocationDisabled) {
                return new CompletableFuture<LogicalSlot>();
            }
            return CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot());
        }

        public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
            this.cancelledSlotRequestIds.add(slotRequestId);
        }

        public List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> getSlotAllocationRequests() {
            return Collections.unmodifiableList(this.slotAllocationRequests);
        }

        public List<SlotRequestId> getReceivedSlotRequestIds() {
            return this.slotAllocationRequests.stream().map(requestTuple -> (SlotRequestId)requestTuple.f0).collect(Collectors.toList());
        }

        public List<SlotRequestId> getCancelledSlotRequestIds() {
            return Collections.unmodifiableList(this.cancelledSlotRequestIds);
        }

        public void disableSlotAllocation() {
            this.slotAllocationDisabled = true;
        }
    }
}

