/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class TaskExecutorSlotLifetimeTest
extends TestLogger {
    @ClassRule
    public static final TestingRpcServiceResource TESTING_RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    @Before
    public void setup() {
        UserClassLoaderExtractingInvokable.clearQueue();
    }

    @Test
    public void testUserCodeClassLoaderIsBoundToSlot() throws Exception {
        Configuration configuration = new Configuration();
        TestingRpcService rpcService = TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService();
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture firstSlotReportFuture = new CompletableFuture();
        resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            firstSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ArrayBlockingQueue taskExecutionStates = new ArrayBlockingQueue(2);
        OneShotLatch slotsOfferedLatch = new OneShotLatch();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            slotsOfferedLatch.trigger();
            return CompletableFuture.completedFuture(slotOffers);
        }).setUpdateTaskExecutionStateFunction(FunctionUtils.uncheckedFunction(taskExecutionState -> {
            taskExecutionStates.put(taskExecutionState);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).build();
        SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        SettableLeaderRetrievalService jobMasterLeaderRetriever = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever).setJobMasterLeaderRetrieverFunction(ignored -> jobMasterLeaderRetriever).build();
        rpcService.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        try (TaskExecutor taskExecutor = this.createTaskExecutor(configuration, rpcService, haServices, unresolvedTaskManagerLocation);){
            TaskExecutionState taskExecutionState2;
            taskExecutor.start();
            SlotReport slotReport = (SlotReport)firstSlotReportFuture.join();
            SlotID firstSlotId = ((SlotStatus)slotReport.iterator().next()).getSlotID();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            taskExecutorGateway.requestSlot(firstSlotId, jobId, allocationId, ResourceProfile.ZERO, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken(), RpcUtils.INF_TIMEOUT).join();
            TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder.newBuilder(jobId, UserClassLoaderExtractingInvokable.class).setAllocationId(allocationId).build();
            slotsOfferedLatch.await();
            taskExecutorGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), RpcUtils.INF_TIMEOUT).join();
            ClassLoader firstClassLoader = UserClassLoaderExtractingInvokable.take();
            while (!(taskExecutionState2 = (TaskExecutionState)taskExecutionStates.take()).getExecutionState().isTerminal()) {
            }
            taskExecutorGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), RpcUtils.INF_TIMEOUT).join();
            ClassLoader secondClassLoader = UserClassLoaderExtractingInvokable.take();
            Assert.assertThat((Object)firstClassLoader, (Matcher)CoreMatchers.sameInstance((Object)secondClassLoader));
        }
    }

    private TaskExecutor createTaskExecutor(Configuration configuration, TestingRpcService rpcService, TestingHighAvailabilityServices haServices, LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws IOException {
        return new TaskExecutor((RpcService)rpcService, TaskManagerConfiguration.fromConfiguration((Configuration)configuration, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)configuration), (String)InetAddress.getLoopbackAddress().getHostAddress()), (HighAvailabilityServices)haServices, new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1)).setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation).build(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (HeartbeatServices)new TestingHeartbeatServices(), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, new BlobCacheService(configuration, (BlobView)new VoidBlobStore(), null), (FatalErrorHandler)this.testingFatalErrorHandlerResource.getFatalErrorHandler(), (TaskExecutorPartitionTracker)new TestingTaskExecutorPartitionTracker(), TaskManagerRunner.createBackPressureSampleService((Configuration)configuration, (ScheduledExecutor)rpcService.getScheduledExecutor()));
    }

    public static final class UserClassLoaderExtractingInvokable
    extends AbstractInvokable {
        private static BlockingQueue<ClassLoader> userCodeClassLoaders = new ArrayBlockingQueue<ClassLoader>(2);

        public UserClassLoaderExtractingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            userCodeClassLoaders.put(this.getEnvironment().getUserClassLoader());
        }

        private static void clearQueue() {
            userCodeClassLoaders.clear();
        }

        private static ClassLoader take() throws InterruptedException {
            return userCodeClassLoaders.take();
        }
    }
}

