/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolResource;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class SlotPoolSlotSharingTest
extends TestLogger {
    @Rule
    public final SlotPoolResource slotPoolResource = new SlotPoolResource((SlotSelectionStrategy)PreviousAllocationSlotSelectionStrategy.create());

    @Test
    public void testSingleQueuedSharedSlotScheduling() throws Exception {
        CompletableFuture allocationIdFuture = new CompletableFuture();
        TestingResourceManagerGateway testingResourceManagerGateway = this.slotPoolResource.getTestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture logicalSlotFuture = slotProvider.allocateSlot(new ScheduledUnit(new JobVertexID(), slotSharingGroupId, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse((boolean)logicalSlotFuture.isDone());
        AllocationID allocationId = (AllocationID)allocationIdFuture.get();
        boolean booleanCompletableFuture = slotPool.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationId, 0, ResourceProfile.ANY));
        Assert.assertTrue((boolean)booleanCompletableFuture);
        LogicalSlot logicalSlot = (LogicalSlot)logicalSlotFuture.get();
        Assert.assertEquals((Object)slotSharingGroupId, (Object)logicalSlot.getSlotSharingGroupId());
    }

    @Test
    public void testFailingQueuedSharedSlotScheduling() throws Exception {
        CompletableFuture allocationIdFuture = new CompletableFuture();
        TestingResourceManagerGateway testingResourceManagerGateway = this.slotPoolResource.getTestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture logicalSlotFuture = slotProvider.allocateSlot(new ScheduledUnit(new JobVertexID(), new SlotSharingGroupId(), null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        AllocationID allocationId = (AllocationID)allocationIdFuture.get();
        SlotPoolImpl slotPoolGateway = this.slotPoolResource.getSlotPool();
        slotPoolGateway.failAllocation(allocationId, (Exception)new FlinkException("Testing Exception"));
        try {
            logicalSlotFuture.get();
            Assert.fail((String)"The slot future should have failed.");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)ee, FlinkException.class).isPresent());
        }
    }

    @Test
    public void testQueuedSharedSlotScheduling() throws Exception {
        ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(2);
        TestingResourceManagerGateway testingResourceManagerGateway = this.slotPoolResource.getTestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexId1 = new JobVertexID();
        JobVertexID jobVertexId2 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture logicalSlotFuture1 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId1, slotSharingGroupId, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture logicalSlotFuture2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId2, slotSharingGroupId, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse((boolean)logicalSlotFuture1.isDone());
        Assert.assertFalse((boolean)logicalSlotFuture2.isDone());
        AllocationID allocationId1 = (AllocationID)allocationIds.take();
        CompletableFuture logicalSlotFuture3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId1, slotSharingGroupId, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture logicalSlotFuture4 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId2, slotSharingGroupId, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse((boolean)logicalSlotFuture3.isDone());
        Assert.assertFalse((boolean)logicalSlotFuture4.isDone());
        allocationIds.take();
        boolean offerFuture = slotPool.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationId1, 0, ResourceProfile.ANY));
        Assert.assertTrue((boolean)offerFuture);
        LogicalSlot logicalSlot1 = (LogicalSlot)logicalSlotFuture1.get();
        LogicalSlot logicalSlot2 = (LogicalSlot)logicalSlotFuture2.get();
        Assert.assertEquals((Object)logicalSlot1.getTaskManagerLocation(), (Object)logicalSlot2.getTaskManagerLocation());
        Assert.assertEquals((Object)allocationId1, (Object)logicalSlot1.getAllocationId());
        Assert.assertEquals((Object)allocationId1, (Object)logicalSlot2.getAllocationId());
        Assert.assertFalse((boolean)logicalSlotFuture3.isDone());
        Assert.assertFalse((boolean)logicalSlotFuture4.isDone());
        logicalSlot1.releaseSlot(null);
        logicalSlot2.releaseSlot(null);
        LogicalSlot logicalSlot3 = (LogicalSlot)logicalSlotFuture3.get();
        LogicalSlot logicalSlot4 = (LogicalSlot)logicalSlotFuture4.get();
        Assert.assertEquals((Object)logicalSlot3.getTaskManagerLocation(), (Object)logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals((Object)allocationId1, (Object)logicalSlot3.getAllocationId());
        Assert.assertEquals((Object)allocationId1, (Object)logicalSlot4.getAllocationId());
    }

    @Test
    public void testQueuedMultipleSlotSharingGroups() throws Exception {
        ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(4);
        TestingResourceManagerGateway testingResourceManagerGateway = this.slotPoolResource.getTestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SlotSharingGroupId slotSharingGroupId1 = new SlotSharingGroupId();
        SlotSharingGroupId slotSharingGroupId2 = new SlotSharingGroupId();
        JobVertexID jobVertexId1 = new JobVertexID();
        JobVertexID jobVertexId2 = new JobVertexID();
        JobVertexID jobVertexId3 = new JobVertexID();
        JobVertexID jobVertexId4 = new JobVertexID();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture logicalSlotFuture1 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId1, slotSharingGroupId1, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture logicalSlotFuture2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId2, slotSharingGroupId1, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture logicalSlotFuture3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId3, slotSharingGroupId2, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        CompletableFuture logicalSlotFuture4 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId4, slotSharingGroupId2, null), SlotProfile.noRequirements(), TestingUtils.infiniteTime());
        Assert.assertFalse((boolean)logicalSlotFuture1.isDone());
        Assert.assertFalse((boolean)logicalSlotFuture2.isDone());
        Assert.assertFalse((boolean)logicalSlotFuture3.isDone());
        Assert.assertFalse((boolean)logicalSlotFuture4.isDone());
        AllocationID allocationId1 = (AllocationID)allocationIds.take();
        AllocationID allocationId2 = (AllocationID)allocationIds.take();
        boolean offerFuture1 = slotPool.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationId1, 0, ResourceProfile.ANY));
        boolean offerFuture2 = slotPool.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationId2, 0, ResourceProfile.ANY));
        Assert.assertTrue((boolean)offerFuture1);
        Assert.assertTrue((boolean)offerFuture2);
        LogicalSlot logicalSlot1 = (LogicalSlot)logicalSlotFuture1.get();
        LogicalSlot logicalSlot2 = (LogicalSlot)logicalSlotFuture2.get();
        LogicalSlot logicalSlot3 = (LogicalSlot)logicalSlotFuture3.get();
        LogicalSlot logicalSlot4 = (LogicalSlot)logicalSlotFuture4.get();
        Assert.assertEquals((Object)logicalSlot1.getTaskManagerLocation(), (Object)logicalSlot2.getTaskManagerLocation());
        Assert.assertEquals((Object)logicalSlot3.getTaskManagerLocation(), (Object)logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals((Object)allocationId1, (Object)logicalSlot1.getAllocationId());
        Assert.assertEquals((Object)allocationId2, (Object)logicalSlot3.getAllocationId());
    }

    @Test
    public void testSlotSharingRespectsRemainingResource() throws Exception {
        ResourceProfile allocatedSlotRp = ResourceProfile.fromResources((double)3.0, (int)300);
        ResourceProfile largeRequestResource = ResourceProfile.fromResources((double)2.0, (int)200);
        ResourceProfile smallRequestResource = ResourceProfile.fromResources((double)1.0, (int)100);
        ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(2);
        TestingResourceManagerGateway testingResourceManagerGateway = this.slotPoolResource.getTestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        JobVertexID jobVertexId1 = new JobVertexID();
        JobVertexID jobVertexId2 = new JobVertexID();
        JobVertexID jobVertexId3 = new JobVertexID();
        SlotProvider slotProvider = this.slotPoolResource.getSlotProvider();
        CompletableFuture logicalSlotFuture1 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId1, slotSharingGroupId, null), SlotProfile.noLocality((ResourceProfile)largeRequestResource), TestingUtils.infiniteTime());
        AllocationID allocationId1 = (AllocationID)allocationIds.take();
        boolean offerFuture = slotPool.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationId1, 0, allocatedSlotRp));
        Assert.assertTrue((boolean)offerFuture);
        Assert.assertTrue((boolean)logicalSlotFuture1.isDone());
        Assert.assertEquals((Object)allocationId1, (Object)((LogicalSlot)logicalSlotFuture1.get()).getAllocationId());
        CompletableFuture logicalSlotFuture2 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId2, slotSharingGroupId, null), SlotProfile.noLocality((ResourceProfile)largeRequestResource), TestingUtils.infiniteTime());
        Assert.assertFalse((boolean)logicalSlotFuture2.isDone());
        CompletableFuture logicalSlotFuture3 = slotProvider.allocateSlot(new ScheduledUnit(jobVertexId3, slotSharingGroupId, null), SlotProfile.noLocality((ResourceProfile)smallRequestResource), TestingUtils.infiniteTime());
        Assert.assertTrue((boolean)logicalSlotFuture3.isDone());
        Assert.assertEquals((Object)allocationId1, (Object)((LogicalSlot)logicalSlotFuture1.get()).getAllocationId());
        AllocationID allocationId2 = (AllocationID)allocationIds.take();
        offerFuture = slotPool.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), new SlotOffer(allocationId2, 0, allocatedSlotRp));
        Assert.assertTrue((boolean)offerFuture);
        Assert.assertTrue((boolean)logicalSlotFuture2.isDone());
        Assert.assertEquals((Object)allocationId2, (Object)((LogicalSlot)logicalSlotFuture2.get()).getAllocationId());
    }
}

