/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TaskManagerReleaseInSlotManagerTest
extends TestLogger {
    private static final ResourceID resourceID = ResourceID.generate();
    private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
    private static final SlotID slotId = new SlotID(resourceID, 0);
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1);
    private static final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
    private static final SlotReport slotReport = new SlotReport(slotStatus);
    private final AtomicReference<CompletableFuture<Boolean>> canBeReleasedFuture = new AtomicReference();
    private final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setCanBeReleasedSupplier(this.canBeReleasedFuture::get).createTestingTaskExecutorGateway();
    private final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, this.taskExecutorGateway);
    private CompletableFuture<InstanceID> releaseFuture;
    private ResourceActions resourceManagerActions;
    private ManuallyTriggeredScheduledExecutor mainThreadExecutor;

    @Before
    public void setup() {
        this.canBeReleasedFuture.set(new CompletableFuture());
        this.releaseFuture = new CompletableFuture();
        this.resourceManagerActions = new TestingResourceActionsBuilder().setReleaseResourceConsumer((instanceID, e) -> this.releaseFuture.complete((InstanceID)instanceID)).build();
        this.mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        ScheduledExecutorService executor = TestingUtils.defaultExecutor();
        this.canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
        try (SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setTaskManagerTimeout(Time.milliseconds((long)10L)).build();){
            slotManager.start(resourceManagerId, (Executor)executor, this.resourceManagerActions);
            executor.execute(() -> this.lambda$testTaskManagerTimeout$1((SlotManager)slotManager));
            Assert.assertThat((Object)this.releaseFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerConnection.getInstanceID())));
        }
    }

    @Test
    public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception {
        try (SlotManagerImpl slotManager = this.createAndStartSlotManagerWithTM();){
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false);
            this.verifyTmReleased(false);
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
            this.verifyTmReleased(true);
        }
    }

    @Test
    public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception {
        try (SlotManagerImpl slotManager = this.createAndStartSlotManagerWithTM();){
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> {
                AllocationID allocationID = new AllocationID();
                slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
                this.mainThreadExecutor.triggerAll();
                slotManager.freeSlot(slotId, allocationID);
            });
            this.verifyTmReleased(false);
            this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
            this.verifyTmReleased(true);
        }
    }

    private SlotManagerImpl createAndStartSlotManagerWithTM() {
        SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setScheduledExecutor(this.mainThreadExecutor).setTaskManagerTimeout(Time.milliseconds((long)0L)).build();
        slotManager.start(resourceManagerId, (Executor)((Object)this.mainThreadExecutor), this.resourceManagerActions);
        this.mainThreadExecutor.execute(() -> slotManager.registerTaskManager(this.taskManagerConnection, slotReport));
        return slotManager;
    }

    private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(SlotManagerImpl slotManager, boolean canBeReleased) throws Exception {
        this.checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, canBeReleased, () -> {});
    }

    private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(SlotManagerImpl slotManager, boolean canBeReleased, RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
        this.canBeReleasedFuture.set(new CompletableFuture());
        this.mainThreadExecutor.execute(() -> ((SlotManagerImpl)slotManager).checkTaskManagerTimeouts());
        this.mainThreadExecutor.triggerAll();
        doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
        this.canBeReleasedFuture.get().complete(canBeReleased);
        this.mainThreadExecutor.triggerAll();
    }

    private void verifyTmReleased(boolean isTmReleased) {
        Assert.assertThat((Object)this.releaseFuture.isDone(), (Matcher)Matchers.is((Object)isTmReleased));
        if (isTmReleased) {
            Assert.assertThat((Object)this.releaseFuture.join(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerConnection.getInstanceID())));
        }
    }

    private /* synthetic */ void lambda$testTaskManagerTimeout$1(SlotManager slotManager) {
        slotManager.registerTaskManager(this.taskManagerConnection, slotReport);
    }
}

