/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.LeaderElectionException;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesLeaderElectionDriver
implements LeaderElectionDriver {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
    private final Object watchLock = new Object();
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final String lockIdentity;
    private final KubernetesLeaderElector leaderElector;
    private final Map<String, String> configMapLabels;
    private final LeaderElectionEventHandler leaderElectionEventHandler;
    private final FatalErrorHandler fatalErrorHandler;
    private final CountDownLatch configMapLatch = new CountDownLatch(1);
    private volatile boolean running;
    @GuardedBy(value="watchLock")
    private KubernetesWatch kubernetesWatch;

    public KubernetesLeaderElectionDriver(FlinkKubeClient kubeClient, KubernetesLeaderElectionConfiguration leaderConfig, LeaderElectionEventHandler leaderElectionEventHandler, FatalErrorHandler fatalErrorHandler) {
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient, (String)"Kubernetes client");
        Preconditions.checkNotNull((Object)leaderConfig, (String)"Leader election configuration");
        this.leaderElectionEventHandler = (LeaderElectionEventHandler)Preconditions.checkNotNull((Object)leaderElectionEventHandler, (String)"LeaderElectionEventHandler");
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.configMapName = leaderConfig.getConfigMapName();
        this.lockIdentity = leaderConfig.getLockIdentity();
        this.leaderElector = kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
        this.configMapLabels = KubernetesUtils.getConfigMapLabels(leaderConfig.getClusterId(), "high-availability");
        this.running = true;
        this.leaderElector.run();
        this.kubernetesWatch = kubeClient.watchConfigMaps(this.configMapName, new ConfigMapCallbackHandlerImpl());
        try {
            this.configMapLatch.await();
        }
        catch (InterruptedException e) {
            throw new FlinkRuntimeException("Interrupted while waiting for leader ConfigMap to be created.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        if (!this.running) {
            return;
        }
        this.running = false;
        LOG.info("Closing {}.", (Object)this);
        this.leaderElector.stop();
        Object object = this.watchLock;
        synchronized (object) {
            if (this.kubernetesWatch != null) {
                this.kubernetesWatch.close();
            }
        }
    }

    public void writeLeaderInformation(LeaderInformation leaderInformation) {
        assert (this.running);
        UUID confirmedLeaderSessionID = leaderInformation.getLeaderSessionID();
        String confirmedLeaderAddress = leaderInformation.getLeaderAddress();
        try {
            this.kubeClient.checkAndUpdateConfigMap(this.configMapName, configMap -> {
                if (KubernetesLeaderElector.hasLeadership(configMap, this.lockIdentity)) {
                    if (confirmedLeaderAddress == null) {
                        configMap.getData().remove("address");
                    } else {
                        configMap.getData().put("address", confirmedLeaderAddress);
                    }
                    if (confirmedLeaderSessionID == null) {
                        configMap.getData().remove("sessionId");
                    } else {
                        configMap.getData().put("sessionId", confirmedLeaderSessionID.toString());
                    }
                    configMap.getLabels().putAll(this.configMapLabels);
                    return Optional.of(configMap);
                }
                return Optional.empty();
            }).get();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully wrote leader information: Leader={}, session ID={}.", (Object)confirmedLeaderAddress, (Object)confirmedLeaderSessionID);
            }
        }
        catch (Exception e) {
            this.fatalErrorHandler.onFatalError((Throwable)((Object)new KubernetesException("Could not write leader information since ConfigMap " + this.configMapName + " does not exist.", e)));
        }
    }

    public boolean hasLeadership() {
        assert (this.running);
        Optional<KubernetesConfigMap> configMapOpt = this.kubeClient.getConfigMap(this.configMapName);
        if (configMapOpt.isPresent()) {
            return KubernetesLeaderElector.hasLeadership(configMapOpt.get(), this.lockIdentity);
        }
        this.fatalErrorHandler.onFatalError((Throwable)((Object)new KubernetesException("ConfigMap " + this.configMapName + " does not exist.", null)));
        return false;
    }

    public String toString() {
        return "KubernetesLeaderElectionDriver{configMapName='" + this.configMapName + "'}";
    }

    private class ConfigMapCallbackHandlerImpl
    implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
        private ConfigMapCallbackHandlerImpl() {
        }

        @Override
        public void onAdded(List<KubernetesConfigMap> configMaps) {
            KubernetesLeaderElectionDriver.this.configMapLatch.countDown();
        }

        @Override
        public void onModified(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.leaderElectionEventHandler.onLeaderInformationChange(KubernetesUtils.getLeaderInformationFromConfigMap(configMap));
            }
        }

        @Override
        public void onDeleted(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderElectionException("ConfigMap " + KubernetesLeaderElectionDriver.this.configMapName + " is deleted externally"));
            }
        }

        @Override
        public void onError(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderElectionException("Error while watching the ConfigMap " + KubernetesLeaderElectionDriver.this.configMapName));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleError(Throwable throwable) {
            if (throwable instanceof KubernetesTooOldResourceVersionException) {
                Object object = KubernetesLeaderElectionDriver.this.watchLock;
                synchronized (object) {
                    if (KubernetesLeaderElectionDriver.this.running) {
                        if (KubernetesLeaderElectionDriver.this.kubernetesWatch != null) {
                            KubernetesLeaderElectionDriver.this.kubernetesWatch.close();
                        }
                        LOG.info("Creating a new watch on ConfigMap {}.", (Object)KubernetesLeaderElectionDriver.this.configMapName);
                        KubernetesLeaderElectionDriver.this.kubernetesWatch = KubernetesLeaderElectionDriver.this.kubeClient.watchConfigMaps(KubernetesLeaderElectionDriver.this.configMapName, new ConfigMapCallbackHandlerImpl());
                    }
                }
            } else {
                KubernetesLeaderElectionDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderElectionException("Error while watching the ConfigMap " + KubernetesLeaderElectionDriver.this.configMapName, throwable));
            }
        }
    }

    private class LeaderCallbackHandlerImpl
    extends KubernetesLeaderElector.LeaderCallbackHandler {
        private LeaderCallbackHandlerImpl() {
        }

        @Override
        public void isLeader() {
            KubernetesLeaderElectionDriver.this.leaderElectionEventHandler.onGrantLeadership();
        }

        @Override
        public void notLeader() {
            KubernetesLeaderElectionDriver.this.leaderElectionEventHandler.onRevokeLeadership();
            KubernetesLeaderElectionDriver.this.leaderElector.run();
        }
    }
}

