/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.highavailability;

import java.util.concurrent.Executor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

public class HighAvailabilityServicesUtils {
    public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config, Executor executor) throws Exception {
        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
        switch (highAvailabilityMode) {
            case NONE: {
                return new EmbeddedHaServices(executor);
            }
            case ZOOKEEPER: {
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
                return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), executor, config, blobStoreService);
            }
            case FACTORY_CLASS: {
                return HighAvailabilityServicesUtils.createCustomHAServices(config, executor);
            }
        }
        throw new Exception("High availability mode " + (Object)((Object)highAvailabilityMode) + " is not supported.");
    }

    public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception {
        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
        switch (highAvailabilityMode) {
            case NONE: {
                Tuple2<String, Integer> hostnamePort = HighAvailabilityServicesUtils.getJobManagerAddress(configuration);
                String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl((String)hostnamePort.f0, (int)((Integer)hostnamePort.f1), "jobmanager", addressResolution, configuration);
                String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl((String)hostnamePort.f0, (int)((Integer)hostnamePort.f1), "resourcemanager", addressResolution, configuration);
                String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl((String)hostnamePort.f0, (int)((Integer)hostnamePort.f1), "dispatcher", addressResolution, configuration);
                String address = (String)Preconditions.checkNotNull((Object)configuration.getString(RestOptions.ADDRESS), (String)"%s must be set", (Object[])new Object[]{RestOptions.ADDRESS.key()});
                int port = configuration.getInteger(RestOptions.PORT);
                boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
                String protocol = enableSSL ? "https://" : "http://";
                return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, jobManagerRpcUrl, String.format("%s%s:%s", protocol, address, port));
            }
            case ZOOKEEPER: {
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
                return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService);
            }
            case FACTORY_CLASS: {
                return HighAvailabilityServicesUtils.createCustomHAServices(configuration, executor);
            }
        }
        throw new Exception("Recovery mode " + (Object)((Object)highAvailabilityMode) + " is not supported.");
    }

    public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
        String hostname = configuration.getString(JobManagerOptions.ADDRESS);
        int port = configuration.getInteger(JobManagerOptions.PORT);
        if (hostname == null) {
            throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS + "' is missing (hostname/address of JobManager to connect to).");
        }
        if (port <= 0 || port >= 65536) {
            throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT + "' (port of the JobManager actor system) : " + port + ".  it must be greater than 0 and less than 65536.");
        }
        return Tuple2.of((Object)hostname, (Object)port);
    }

    private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
        HighAvailabilityServicesFactory highAvailabilityServicesFactory;
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
        try {
            highAvailabilityServicesFactory = (HighAvailabilityServicesFactory)InstantiationUtil.instantiate((String)haServicesClassName, HighAvailabilityServicesFactory.class, (ClassLoader)classLoader);
        }
        catch (Exception e) {
            throw new FlinkException(String.format("Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.", haServicesClassName), (Throwable)e);
        }
        try {
            return highAvailabilityServicesFactory.createHAServices(config, executor);
        }
        catch (Exception e) {
            throw new FlinkException(String.format("Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.", haServicesClassName), (Throwable)e);
        }
    }

    public static enum AddressResolution {
        TRY_ADDRESS_RESOLUTION,
        NO_ADDRESS_RESOLUTION;

    }
}

