/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.Preconditions;

public class TestingRpcService
extends AkkaRpcService {
    private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
    private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
    private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;

    public TestingRpcService() {
        this(new Configuration());
    }

    public TestingRpcService(Configuration configuration) {
        super(AkkaUtils.createLocalActorSystem((Configuration)configuration), AkkaRpcServiceConfiguration.fromConfiguration((Configuration)configuration));
        this.registeredConnections = new ConcurrentHashMap();
    }

    public CompletableFuture<Void> stopService() {
        CompletableFuture terminationFuture = super.stopService();
        terminationFuture.whenComplete((ignored, throwable) -> this.registeredConnections.clear());
        return terminationFuture;
    }

    public void registerGateway(String address, RpcGateway gateway) {
        Preconditions.checkNotNull((Object)address);
        Preconditions.checkNotNull((Object)gateway);
        if (this.registeredConnections.putIfAbsent(address, gateway) != null) {
            throw new IllegalStateException("a gateway is already registered under " + address);
        }
    }

    private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
        return this.rpcGatewayFutureFunction.apply(gateway);
    }

    public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                RpcGateway typedGateway = gateway;
                return this.getRpcGatewayFuture(typedGateway);
            }
            return FutureUtils.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + clazz));
        }
        return super.connect(address, clazz);
    }

    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                FencedRpcGateway typedGateway = (FencedRpcGateway)gateway;
                return this.getRpcGatewayFuture(typedGateway);
            }
            return FutureUtils.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + clazz));
        }
        return super.connect(address, fencingToken, clazz);
    }

    public void clearGateways() {
        this.registeredConnections.clear();
    }

    public void resetRpcGatewayFutureFunction() {
        this.rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
    }

    public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
        this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
    }
}

