package org.apache.flink.runtime.jobmaster;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.class */
public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger {
    private final HeartbeatServices heartbeatServices = new HeartbeatServices(2147483647L, 2147483647L);
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private static final Time testingTimeout = Time.seconds(10);

    @ClassRule
    public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest$TestingExecutionDeploymentTrackerWrapper.class */
    private static class TestingExecutionDeploymentTrackerWrapper implements ExecutionDeploymentTracker {
        private final ExecutionDeploymentTracker originalTracker;
        private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
        private final CompletableFuture<ExecutionAttemptID> stopFuture;

        private TestingExecutionDeploymentTrackerWrapper() {
            this((ExecutionDeploymentTracker) new DefaultExecutionDeploymentTracker());
        }

        private TestingExecutionDeploymentTrackerWrapper(ExecutionDeploymentTracker executionDeploymentTracker) {
            this.originalTracker = executionDeploymentTracker;
            this.taskDeploymentFuture = new CompletableFuture<>();
            this.stopFuture = new CompletableFuture<>();
        }

        public void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptID, ResourceID resourceID) {
            this.originalTracker.startTrackingPendingDeploymentOf(executionAttemptID, resourceID);
        }

        public void completeDeploymentOf(ExecutionAttemptID executionAttemptID) {
            this.originalTracker.completeDeploymentOf(executionAttemptID);
            this.taskDeploymentFuture.complete(executionAttemptID);
        }

        public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptID) {
            this.originalTracker.stopTrackingDeploymentOf(executionAttemptID);
            this.stopFuture.complete(executionAttemptID);
        }

        public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID resourceID) {
            return this.originalTracker.getExecutionsOn(resourceID);
        }

        public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
            return this.taskDeploymentFuture;
        }

        public CompletableFuture<ExecutionAttemptID> getStopFuture() {
            return this.stopFuture;
        }
    }

    @Before
    public void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService);
        this.haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
    }

    @Test
    public void testExecutionDeploymentReconciliation() throws Exception {
        JobMasterBuilder.TestingOnCompletionActions testingOnCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        TestingExecutionDeploymentTrackerWrapper testingExecutionDeploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper();
        JobMasterGateway selfGateway = createAndStartJobMaster(testingOnCompletionActions, testingExecutionDeploymentTrackerWrapper).getSelfGateway(JobMasterGateway.class);
        RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(selfGateway.getAddress(), selfGateway);
        CompletableFuture<ExecutionAttemptID> completableFuture = new CompletableFuture<>();
        TaskExecutorGateway createTaskExecutorGateway = createTaskExecutorGateway(completableFuture);
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        registerTaskExecutorAndOfferSlots(selfGateway, createTaskExecutorGateway, localUnresolvedTaskManagerLocation);
        ExecutionAttemptID executionAttemptID = testingExecutionDeploymentTrackerWrapper.getTaskDeploymentFuture().get();
        Assert.assertFalse(completableFuture.isDone());
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        selfGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(Collections.emptyList()), new ExecutionDeploymentReport(Collections.singleton(executionAttemptID2))));
        Assert.assertThat(completableFuture.get(), Is.is(executionAttemptID2));
        Assert.assertThat(testingExecutionDeploymentTrackerWrapper.getStopFuture().get(), Is.is(executionAttemptID));
        Assert.assertThat(testingOnCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getState(), Is.is(JobStatus.FAILED));
    }

    @Test
    public void testExecutionDeploymentReconciliationForPendingExecution() throws Exception {
        TestingExecutionDeploymentTrackerWrapper testingExecutionDeploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper();
        JobMasterGateway selfGateway = createAndStartJobMaster(testingExecutionDeploymentTrackerWrapper).getSelfGateway(JobMasterGateway.class);
        RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(selfGateway.getAddress(), selfGateway);
        CompletableFuture<ExecutionAttemptID> completableFuture = new CompletableFuture<>();
        CompletableFuture<ExecutionAttemptID> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Acknowledge> completableFuture3 = new CompletableFuture<>();
        TaskExecutorGateway createTaskExecutorGateway = createTaskExecutorGateway(completableFuture2, completableFuture, completableFuture3);
        LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        registerTaskExecutorAndOfferSlots(selfGateway, createTaskExecutorGateway, localUnresolvedTaskManagerLocation);
        selfGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(Collections.emptyList()), new ExecutionDeploymentReport(Collections.singleton(completableFuture.get()))));
        completableFuture3.complete(Acknowledge.get());
        testingExecutionDeploymentTrackerWrapper.getTaskDeploymentFuture().get();
        Assert.assertFalse(completableFuture2.isDone());
    }

    private JobMaster createAndStartJobMaster(ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
        return createAndStartJobMaster(new JobMasterBuilder.TestingOnCompletionActions(), executionDeploymentTracker);
    }

    private JobMaster createAndStartJobMaster(OnCompletionActions onCompletionActions, ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
        JobMaster createJobMaster = new JobMasterBuilder(JobGraphTestUtils.createSingleVertexJobGraph(), RPC_SERVICE_RESOURCE.getTestingRpcService()).withFatalErrorHandler(this.testingFatalErrorHandlerResource.getFatalErrorHandler()).withHighAvailabilityServices(this.haServices).withHeartbeatServices(this.heartbeatServices).withExecutionDeploymentTracker(executionDeploymentTracker).withOnCompletionActions(onCompletionActions).createJobMaster();
        createJobMaster.start(JobMasterId.generate()).get();
        return createJobMaster;
    }

    private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> completableFuture) {
        return createTaskExecutorGateway(completableFuture, new CompletableFuture<>(), CompletableFuture.completedFuture(Acknowledge.get()));
    }

    private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> completableFuture, CompletableFuture<ExecutionAttemptID> completableFuture2, CompletableFuture<Acknowledge> completableFuture3) {
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).setCancelTaskFunction(executionAttemptID -> {
            completableFuture.complete(executionAttemptID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            completableFuture2.complete(taskDeploymentDescriptor.getExecutionAttemptId());
            return completableFuture3;
        }).createTestingTaskExecutorGateway();
        RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        return createTestingTaskExecutorGateway;
    }

    private void registerTaskExecutorAndOfferSlots(JobMasterGateway jobMasterGateway, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws ExecutionException, InterruptedException {
        jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), unresolvedTaskManagerLocation, testingTimeout).get();
        jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN)), testingTimeout).get();
    }
}
