/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaBasedEndpoint;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.rpc.akka.ControlMessages;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;

public class AkkaRpcActorTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActorTest.class);
    private static Time timeout = Time.milliseconds((long)10000L);
    private static AkkaRpcService akkaRpcService;

    @BeforeClass
    public static void setup() {
        akkaRpcService = new TestingRpcService();
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        RpcUtils.terminateRpcService((RpcService)akkaRpcService, (Time)timeout);
    }

    @Test
    public void testAddressResolution() throws Exception {
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        CompletableFuture futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
        DummyRpcGateway rpcGateway = (DummyRpcGateway)futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
        Assert.assertEquals((Object)rpcEndpoint.getAddress(), (Object)rpcGateway.getAddress());
    }

    @Test
    public void testFailingAddressResolution() throws Exception {
        CompletableFuture futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
        try {
            futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"The rpc connection resolution should have failed.");
        }
        catch (ExecutionException exception) {
            Assert.assertTrue((boolean)(exception.getCause() instanceof RpcConnectionException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMessageDiscarding() throws Exception {
        int expectedValue = 1337;
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        DummyRpcGateway rpcGateway = (DummyRpcGateway)rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
        CompletableFuture<Integer> result = rpcGateway.foobar();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"Expected an AkkaRpcException.");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof AkkaRpcException));
        }
        rpcEndpoint.setFoobar(expectedValue);
        rpcEndpoint.start();
        try {
            result = rpcGateway.foobar();
            Integer actualValue = result.get(timeout.getSize(), timeout.getUnit());
            Assert.assertThat((String)"The new foobar value should have been returned.", (Object)actualValue, (Matcher)Is.is((Object)expectedValue));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)timeout);
        }
    }

    @Test(timeout=5000L)
    public void testRpcEndpointTerminationFuture() throws Exception {
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        CompletableFuture terminationFuture = rpcEndpoint.getTerminationFuture();
        Assert.assertFalse((boolean)terminationFuture.isDone());
        CompletableFuture.runAsync(() -> ((DummyRpcEndpoint)rpcEndpoint).closeAsync(), akkaRpcService.getExecutor());
        terminationFuture.get();
    }

    @Test
    public void testExceptionPropagation() throws Exception {
        ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        ExceptionalGateway rpcGateway = (ExceptionalGateway)rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
        CompletableFuture<Integer> result = rpcGateway.doStuff();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"this should fail with an exception");
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(RuntimeException.class, cause.getClass());
            Assert.assertEquals((Object)"my super specific test exception", (Object)cause.getMessage());
        }
    }

    @Test
    public void testExceptionPropagationFuturePiping() throws Exception {
        ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        ExceptionalGateway rpcGateway = (ExceptionalGateway)rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
        CompletableFuture<Integer> result = rpcGateway.doStuff();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"this should fail with an exception");
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(Exception.class, cause.getClass());
            Assert.assertEquals((Object)"some test", (Object)cause.getMessage());
        }
    }

    @Test
    public void testOnStopExceptionPropagation() throws Exception {
        FailingOnStopEndpoint rpcEndpoint = new FailingOnStopEndpoint((RpcService)akkaRpcService, "FailingOnStopEndpoint");
        rpcEndpoint.start();
        CompletableFuture terminationFuture = rpcEndpoint.closeAsync();
        try {
            terminationFuture.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof FailingOnStopEndpoint.OnStopException));
        }
    }

    @Test
    public void testOnStopExecutedByMainThread() throws Exception {
        SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint((RpcService)akkaRpcService, "SimpleRpcEndpoint");
        simpleRpcEndpoint.start();
        CompletableFuture terminationFuture = simpleRpcEndpoint.closeAsync();
        terminationFuture.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testActorTerminationWhenServiceShutdown() throws Exception {
        ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem();
        AkkaRpcService rpcService = new AkkaRpcService(rpcActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
        try {
            SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint((RpcService)rpcService, SimpleRpcEndpoint.class.getSimpleName());
            rpcEndpoint.start();
            CompletableFuture terminationFuture = rpcEndpoint.getTerminationFuture();
            rpcService.stopService();
            terminationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        finally {
            rpcActorSystem.terminate();
            FutureUtils.toJava((Future)rpcActorSystem.whenTerminated()).get(timeout.getSize(), timeout.getUnit());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testActorTerminationWithAsynchronousOnStopAction() throws Exception {
        CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
        AsynchronousOnStopEndpoint endpoint = new AsynchronousOnStopEndpoint((RpcService)akkaRpcService, onStopFuture);
        try {
            endpoint.start();
            CompletableFuture terminationFuture = endpoint.closeAsync();
            Assert.assertFalse((boolean)terminationFuture.isDone());
            onStopFuture.complete(null);
            terminationFuture.get();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)endpoint, (Time)timeout);
        }
    }

    @Test
    public void testMainThreadExecutionOnStop() throws Exception {
        MainThreadExecutorOnStopEndpoint endpoint = new MainThreadExecutorOnStopEndpoint((RpcService)akkaRpcService);
        try {
            endpoint.start();
            CompletableFuture terminationFuture = endpoint.closeAsync();
            terminationFuture.get();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)endpoint, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOnStopFutureCompletionDirectlyTerminatesAkkaRpcActor() throws Exception {
        CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
        TerminatingAfterOnStopFutureCompletionEndpoint endpoint = new TerminatingAfterOnStopFutureCompletionEndpoint((RpcService)akkaRpcService, onStopFuture);
        try {
            endpoint.start();
            AsyncOperationGateway asyncOperationGateway = (AsyncOperationGateway)endpoint.getSelfGateway(AsyncOperationGateway.class);
            CompletableFuture terminationFuture = endpoint.closeAsync();
            Assert.assertThat((Object)terminationFuture.isDone(), (Matcher)Matchers.is((Object)false));
            CompletableFuture<Integer> firstAsyncOperationFuture = asyncOperationGateway.asyncOperation(timeout);
            CompletableFuture<Integer> secondAsyncOperationFuture = asyncOperationGateway.asyncOperation(timeout);
            endpoint.awaitEnterAsyncOperation();
            onStopFuture.complete(null);
            Assert.assertThat((Object)terminationFuture.isDone(), (Matcher)Matchers.is((Object)false));
            endpoint.triggerUnblockAsyncOperation();
            Assert.assertThat((Object)firstAsyncOperationFuture.get(), (Matcher)Matchers.is((Object)42));
            terminationFuture.get();
            Assert.assertThat((Object)endpoint.getNumberAsyncOperationCalls(), (Matcher)Matchers.is((Object)1));
            Assert.assertThat((Object)secondAsyncOperationFuture.isDone(), (Matcher)Matchers.is((Object)false));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)endpoint, (Time)timeout);
        }
    }

    @Test
    public void testOnStartIsCalledWhenRpcEndpointStarts() throws Exception {
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint((RpcService)akkaRpcService, null);
        try {
            onStartEndpoint.start();
            onStartEndpoint.awaitUntilOnStartCalled();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)onStartEndpoint, (Time)timeout);
        }
    }

    @Test
    public void testOnStartFails() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint((RpcService)akkaRpcService, (Exception)testException);
        onStartEndpoint.start();
        onStartEndpoint.awaitUntilOnStartCalled();
        try {
            onStartEndpoint.getTerminationFuture().get();
            Assert.fail((String)"Expected that the rpc endpoint failed onStart and thus has terminated.");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)ee, exception -> exception.equals(testException)).isPresent(), (Matcher)Matchers.is((Object)true));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void callsOnStopOnlyOnce() throws Exception {
        CompletableFuture<Object> onStopFuture = new CompletableFuture<Object>();
        OnStopCountingRpcEndpoint endpoint = new OnStopCountingRpcEndpoint((RpcService)akkaRpcService, onStopFuture);
        try {
            endpoint.start();
            AkkaBasedEndpoint selfGateway = (AkkaBasedEndpoint)endpoint.getSelfGateway(AkkaBasedEndpoint.class);
            selfGateway.getActorRef().tell((Object)ControlMessages.TERMINATE, ActorRef.noSender());
            selfGateway.getActorRef().tell((Object)ControlMessages.TERMINATE, ActorRef.noSender());
            endpoint.waitUntilOnStopHasBeenCalled();
            onStopFuture.complete(null);
            endpoint.getTerminationFuture().get();
            Assert.assertThat((Object)endpoint.getNumOnStopCalls(), (Matcher)Matchers.is((Object)1));
        }
        finally {
            onStopFuture.complete(null);
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)endpoint, (Time)timeout);
        }
    }

    @Test
    public void canReuseEndpointNameAfterTermination() throws Exception {
        String endpointName = "not_unique";
        try (SimpleRpcEndpoint simpleRpcEndpoint1 = new SimpleRpcEndpoint((RpcService)akkaRpcService, "not_unique");){
            simpleRpcEndpoint1.start();
            simpleRpcEndpoint1.closeAsync().join();
            try (SimpleRpcEndpoint simpleRpcEndpoint2 = new SimpleRpcEndpoint((RpcService)akkaRpcService, "not_unique");){
                simpleRpcEndpoint2.start();
                Assert.assertThat((Object)simpleRpcEndpoint2.getAddress(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)simpleRpcEndpoint1.getAddress())));
            }
        }
    }

    @Test
    public void terminationFutureDoesNotBlockRpcEndpointCreation() throws Exception {
        try (SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint((RpcService)akkaRpcService, "foobar");){
            CompletableFuture terminationFuture = simpleRpcEndpoint.getTerminationFuture();
            CompletionStage foobar2 = terminationFuture.thenApply(ignored -> new SimpleRpcEndpoint((RpcService)akkaRpcService, "foobar2"));
            simpleRpcEndpoint.closeAsync();
            SimpleRpcEndpoint simpleRpcEndpoint2 = (SimpleRpcEndpoint)((Object)((CompletableFuture)foobar2).join());
            simpleRpcEndpoint2.close();
        }
    }

    @Test
    public void resolvesRunningAkkaRpcActor() throws Exception {
        String endpointName = "foobar";
        try (RpcEndpoint simpleRpcEndpoint1 = this.createRpcEndpointWithRandomNameSuffix("foobar");
             RpcEndpoint simpleRpcEndpoint2 = this.createRpcEndpointWithRandomNameSuffix("foobar");){
            simpleRpcEndpoint1.closeAsync().join();
            String wildcardName = AkkaRpcServiceUtils.createWildcardName((String)"foobar");
            String wildcardAddress = AkkaRpcServiceUtils.getLocalRpcUrl((String)wildcardName);
            RpcGateway rpcGateway = (RpcGateway)akkaRpcService.connect(wildcardAddress, RpcGateway.class).join();
            Assert.assertThat((Object)rpcGateway.getAddress(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)simpleRpcEndpoint2.getAddress())));
        }
    }

    private RpcEndpoint createRpcEndpointWithRandomNameSuffix(String prefix) {
        return new SimpleRpcEndpoint((RpcService)akkaRpcService, AkkaRpcServiceUtils.createRandomName((String)prefix));
    }

    private static final class OnStopCountingRpcEndpoint
    extends RpcEndpoint {
        private final AtomicInteger numOnStopCalls = new AtomicInteger(0);
        private final OneShotLatch onStopHasBeenCalled = new OneShotLatch();
        private final CompletableFuture<Void> onStopFuture;

        private OnStopCountingRpcEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
            super(rpcService);
            this.onStopFuture = onStopFuture;
        }

        protected CompletableFuture<Void> onStop() {
            this.onStopHasBeenCalled.trigger();
            this.numOnStopCalls.incrementAndGet();
            return this.onStopFuture;
        }

        private int getNumOnStopCalls() {
            return this.numOnStopCalls.get();
        }

        private void waitUntilOnStopHasBeenCalled() throws InterruptedException {
            this.onStopHasBeenCalled.await();
        }
    }

    private static final class OnStartEndpoint
    extends RpcEndpoint {
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        @Nullable
        private final Exception exception;

        OnStartEndpoint(RpcService rpcService, @Nullable Exception exception) {
            super(rpcService);
            this.exception = exception;
            this.getTerminationFuture().whenComplete((aVoid, throwable) -> this.closeAsync());
        }

        public void onStart() throws Exception {
            this.countDownLatch.countDown();
            ExceptionUtils.tryRethrowException((Exception)this.exception);
        }

        public void awaitUntilOnStartCalled() throws InterruptedException {
            this.countDownLatch.await();
        }
    }

    private static class TerminatingAfterOnStopFutureCompletionEndpoint
    extends RpcEndpoint
    implements AsyncOperationGateway {
        private final CompletableFuture<Void> onStopFuture;
        private final OneShotLatch blockAsyncOperation = new OneShotLatch();
        private final OneShotLatch enterAsyncOperation = new OneShotLatch();
        private final AtomicInteger asyncOperationCounter = new AtomicInteger(0);

        protected TerminatingAfterOnStopFutureCompletionEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
            super(rpcService);
            this.onStopFuture = onStopFuture;
        }

        @Override
        public CompletableFuture<Integer> asyncOperation(Time timeout) {
            this.asyncOperationCounter.incrementAndGet();
            this.enterAsyncOperation.trigger();
            try {
                this.blockAsyncOperation.await();
            }
            catch (InterruptedException e) {
                throw new FlinkRuntimeException((Throwable)e);
            }
            return CompletableFuture.completedFuture(42);
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }

        void awaitEnterAsyncOperation() throws InterruptedException {
            this.enterAsyncOperation.await();
        }

        void triggerUnblockAsyncOperation() {
            this.blockAsyncOperation.trigger();
        }

        int getNumberAsyncOperationCalls() {
            return this.asyncOperationCounter.get();
        }
    }

    static interface AsyncOperationGateway
    extends RpcGateway {
        public CompletableFuture<Integer> asyncOperation(@RpcTimeout Time var1);
    }

    private static class MainThreadExecutorOnStopEndpoint
    extends RpcEndpoint {
        protected MainThreadExecutorOnStopEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        public CompletableFuture<Void> onStop() {
            return CompletableFuture.runAsync(() -> {}, (Executor)this.getMainThreadExecutor());
        }
    }

    static class AsynchronousOnStopEndpoint
    extends RpcEndpoint {
        private final CompletableFuture<Void> onStopFuture;

        protected AsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
            super(rpcService);
            this.onStopFuture = (CompletableFuture)Preconditions.checkNotNull(onStopFuture);
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }
    }

    private static class FailingOnStopEndpoint
    extends RpcEndpoint
    implements RpcGateway {
        protected FailingOnStopEndpoint(RpcService rpcService, String endpointId) {
            super(rpcService, endpointId);
        }

        public CompletableFuture<Void> onStop() {
            return FutureUtils.completedExceptionally((Throwable)((Object)new OnStopException("Test exception.")));
        }

        private static class OnStopException
        extends FlinkException {
            private static final long serialVersionUID = 6701096588415871592L;

            public OnStopException(String message) {
                super(message);
            }
        }
    }

    private static class SimpleRpcEndpoint
    extends RpcEndpoint
    implements RpcGateway {
        protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
            super(rpcService, endpointId);
        }
    }

    private static class ExceptionalFutureEndpoint
    extends RpcEndpoint
    implements ExceptionalGateway {
        protected ExceptionalFutureEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> doStuff() {
            final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
            new Thread(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    future.completeExceptionally(new Exception("some test"));
                }
            }.start();
            return future;
        }
    }

    private static class ExceptionalEndpoint
    extends RpcEndpoint
    implements ExceptionalGateway {
        protected ExceptionalEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> doStuff() {
            throw new RuntimeException("my super specific test exception");
        }
    }

    private static interface ExceptionalGateway
    extends RpcGateway {
        public CompletableFuture<Integer> doStuff();
    }

    static class DummyRpcEndpoint
    extends RpcEndpoint
    implements DummyRpcGateway {
        private volatile int foobar = 42;

        protected DummyRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(this.foobar);
        }

        public void setFoobar(int value) {
            this.foobar = value;
        }
    }

    static interface DummyRpcGateway
    extends RpcGateway {
        public CompletableFuture<Integer> foobar();
    }
}

