/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.assertj.core.util.Sets;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class ResourceManagerServiceImplTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private static final Time FAST_TIMEOUT = Time.milliseconds((long)50L);
    private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
    private static final ClusterInformation clusterInformation = new ClusterInformation("localhost", 1234);
    private static final MetricRegistry metricRegistry = TestingMetricRegistry.builder().build();
    private static TestingRpcService rpcService;
    private static TestingHighAvailabilityServices haService;
    private static TestingFatalErrorHandler fatalErrorHandler;
    private TestingResourceManagerFactory.Builder rmFactoryBuilder;
    private TestingLeaderElectionService leaderElectionService;
    private ResourceManagerServiceImpl resourceManagerService;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
        haService = new TestingHighAvailabilityServices();
        fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @Before
    public void setup() throws Exception {
        fatalErrorHandler.clearError();
        this.rmFactoryBuilder = new TestingResourceManagerFactory.Builder();
        this.leaderElectionService = new TestingLeaderElectionService();
        haService.setResourceManagerLeaderElectionService(this.leaderElectionService);
    }

    @After
    public void teardown() throws Exception {
        if (this.resourceManagerService != null) {
            this.resourceManagerService.close();
        }
        if (this.leaderElectionService != null) {
            this.leaderElectionService.stop();
        }
        if (fatalErrorHandler.hasExceptionOccurred()) {
            fatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)TIMEOUT);
        }
    }

    private void createAndStartResourceManager() throws Exception {
        this.createResourceManager();
        this.resourceManagerService.start();
    }

    private void createResourceManager() throws Exception {
        TestingResourceManagerFactory rmFactory = this.rmFactoryBuilder.build();
        this.resourceManagerService = ResourceManagerServiceImpl.create((ResourceManagerFactory)rmFactory, (Configuration)new Configuration(), (ResourceID)ResourceID.generate(), (RpcService)rpcService, (HighAvailabilityServices)haService, (HeartbeatServices)heartbeatServices, (FatalErrorHandler)fatalErrorHandler, (ClusterInformation)clusterInformation, null, (MetricRegistry)metricRegistry, (String)"localhost", (Executor)ForkJoinPool.commonPool());
    }

    @Test
    public void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture startRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId);
        Assert.assertThat(startRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), (Matcher)Matchers.is((Object)leaderSessionId));
        Assert.assertThat((Object)this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), (Matcher)Matchers.is((Object)leaderSessionId));
    }

    @Test
    public void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<Object> finishRmInitializationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setInitializeConsumer(ignore -> ResourceManagerServiceImplTest.blockOnFuture(finishRmInitializationFuture));
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId);
        ResourceManagerServiceImplTest.assertNotComplete(this.leaderElectionService.getConfirmationFuture());
        finishRmInitializationFuture.complete(null);
        Assert.assertThat((Object)this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), (Matcher)Matchers.is((Object)leaderSessionId));
    }

    @Test
    public void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
        UUID leaderSessionId1 = UUID.randomUUID();
        UUID leaderSessionId2 = UUID.randomUUID();
        CompletableFuture startRmFuture1 = new CompletableFuture();
        CompletableFuture startRmFuture2 = new CompletableFuture();
        CompletableFuture terminateRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (!startRmFuture1.isDone()) {
                startRmFuture1.complete(uuid);
            } else {
                startRmFuture2.complete(uuid);
            }
        }).setTerminateConsumer(terminateRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId1);
        this.assertRmStarted();
        this.leaderElectionService.isLeader(leaderSessionId2);
        Assert.assertThat(terminateRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), (Matcher)Matchers.is((Object)leaderSessionId1));
        Assert.assertThat(startRmFuture2.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), (Matcher)Matchers.is((Object)leaderSessionId2));
        Assert.assertThat((Object)this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), (Matcher)Matchers.is((Object)leaderSessionId2));
    }

    @Test
    public void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() throws Exception {
        UUID leaderSessionId1 = UUID.randomUUID();
        UUID leaderSessionId2 = UUID.randomUUID();
        CompletableFuture startRmFuture1 = new CompletableFuture();
        CompletableFuture startRmFuture2 = new CompletableFuture();
        CompletableFuture<Object> finishRmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (!startRmFuture1.isDone()) {
                startRmFuture1.complete(uuid);
            } else {
                startRmFuture2.complete(uuid);
            }
        }).setTerminateConsumer(ignore -> ResourceManagerServiceImplTest.blockOnFuture(finishRmTerminationFuture));
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId1);
        this.assertRmStarted();
        this.leaderElectionService.isLeader(leaderSessionId2);
        ResourceManagerServiceImplTest.assertNotComplete(startRmFuture2);
        finishRmTerminationFuture.complete(null);
        Assert.assertThat(startRmFuture2.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), (Matcher)Matchers.is((Object)leaderSessionId2));
        Assert.assertThat((Object)this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit()).getLeaderSessionId(), (Matcher)Matchers.is((Object)leaderSessionId2));
    }

    @Test
    public void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
        CompletableFuture startRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
        this.createResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        ResourceManagerServiceImplTest.assertNotComplete(startRmFuture);
        ResourceManagerServiceImplTest.assertNotComplete(this.leaderElectionService.getConfirmationFuture());
    }

    @Test
    public void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
        CompletableFuture startRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
        this.createAndStartResourceManager();
        this.resourceManagerService.close();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        ResourceManagerServiceImplTest.assertNotComplete(startRmFuture);
        ResourceManagerServiceImplTest.assertNotComplete(this.leaderElectionService.getConfirmationFuture());
    }

    @Test
    public void revokeLeadership_stopExistLeader() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture terminateRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId);
        this.assertRmStarted();
        this.leaderElectionService.notLeader();
        Assert.assertThat(terminateRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), (Matcher)Matchers.is((Object)leaderSessionId));
    }

    @Test
    public void revokeLeadership_terminateService_multiLeaderSessionNotSupported() throws Exception {
        this.rmFactoryBuilder.setSupportMultiLeaderSession(false);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.assertRmStarted();
        this.leaderElectionService.notLeader();
        this.resourceManagerService.getTerminationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void leaderRmTerminated_terminateService() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<Object> rmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setGetTerminationFutureFunction((ignore1, ignore2) -> rmTerminationFuture);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId);
        this.assertRmStarted();
        rmTerminationFuture.complete(null);
        this.resourceManagerService.getTerminationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture terminateRmFuture = new CompletableFuture();
        CompletableFuture<Object> rmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete).setGetTerminationFutureFunction((ignore1, ignore2) -> rmTerminationFuture);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(leaderSessionId);
        this.assertRmStarted();
        this.leaderElectionService.notLeader();
        Assert.assertThat(terminateRmFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit()), (Matcher)Matchers.is((Object)leaderSessionId));
        rmTerminationFuture.complete(null);
        ResourceManagerServiceImplTest.assertNotComplete(this.resourceManagerService.getTerminationFuture());
    }

    @Test
    public void closeService_stopRmAndLeaderElection() throws Exception {
        CompletableFuture terminateRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.assertRmStarted();
        Assert.assertFalse((boolean)this.leaderElectionService.isStopped());
        this.resourceManagerService.close();
        Assert.assertTrue((boolean)terminateRmFuture.isDone());
        Assert.assertTrue((boolean)this.leaderElectionService.isStopped());
    }

    @Test
    public void closeService_futureCompleteAfterRmTerminated() throws Exception {
        CompletableFuture<Object> finishRmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setTerminateConsumer(ignore -> ResourceManagerServiceImplTest.blockOnFuture(finishRmTerminationFuture));
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.assertRmStarted();
        CompletableFuture closeServiceFuture = this.resourceManagerService.closeAsync();
        ResourceManagerServiceImplTest.assertNotComplete(closeServiceFuture);
        finishRmTerminationFuture.complete(null);
        closeServiceFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void deregisterApplication_leaderRmNotStarted() throws Exception {
        CompletableFuture startRmInitializationFuture = new CompletableFuture();
        CompletableFuture<Object> finishRmInitializationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setInitializeConsumer(ignore -> {
            startRmInitializationFuture.complete(null);
            ResourceManagerServiceImplTest.blockOnFuture(finishRmInitializationFuture);
        });
        this.createAndStartResourceManager();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        startRmInitializationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
        CompletableFuture deregisterApplicationFuture = this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
        ResourceManagerServiceImplTest.assertNotComplete(deregisterApplicationFuture);
        finishRmInitializationFuture.complete(null);
        deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void deregisterApplication_noLeaderRm() throws Exception {
        this.createAndStartResourceManager();
        CompletableFuture deregisterApplicationFuture = this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
        deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    @Test
    public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
        Set registeredMetrics = Collections.newSetFromMap(new ConcurrentHashMap());
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((a, b, c) -> registeredMetrics.add(b)).setUnregisterConsumer((a, b, c) -> registeredMetrics.remove(b)).build();
        TestingResourceManagerFactory rmFactory = this.rmFactoryBuilder.build();
        this.resourceManagerService = ResourceManagerServiceImpl.create((ResourceManagerFactory)rmFactory, (Configuration)new Configuration(), (ResourceID)ResourceID.generate(), (RpcService)rpcService, (HighAvailabilityServices)haService, (HeartbeatServices)heartbeatServices, (FatalErrorHandler)fatalErrorHandler, (ClusterInformation)clusterInformation, null, (MetricRegistry)metricRegistry, (String)"localhost", (Executor)ForkJoinPool.commonPool());
        this.resourceManagerService.start();
        Assert.assertEquals((long)0L, (long)registeredMetrics.size());
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.assertRmStarted();
        Set expectedMetrics = Sets.set((Object[])new String[]{"numRegisteredTaskManagers", "taskSlotsTotal", "taskSlotsAvailable"});
        Assert.assertTrue((String)"Expected RM to register leader metrics", (boolean)registeredMetrics.containsAll(expectedMetrics));
        this.revokeLeadership();
        HashSet intersection = new HashSet(registeredMetrics);
        intersection.retainAll(expectedMetrics);
        Assert.assertTrue((String)"Expected RM to unregister leader metrics", (boolean)intersection.isEmpty());
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.assertRmStarted();
        Assert.assertTrue((String)"Expected RM to re-register leader metrics", (boolean)registeredMetrics.containsAll(expectedMetrics));
    }

    private static void blockOnFuture(CompletableFuture<?> future) {
        try {
            future.get();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    private static void assertNotComplete(CompletableFuture<?> future) throws Exception {
        try {
            future.get(FAST_TIMEOUT.getSize(), FAST_TIMEOUT.getUnit());
            Assert.fail();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    private void assertRmStarted() throws Exception {
        this.leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
    }

    private void revokeLeadership() {
        ResourceManager leaderResourceManager = this.resourceManagerService.getLeaderResourceManager();
        this.leaderElectionService.notLeader();
        ResourceManagerServiceImplTest.blockOnFuture(leaderResourceManager.getTerminationFuture());
    }
}

