/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.KubernetesWorkerNode;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesResourceManager
extends ActiveResourceManager<KubernetesWorkerNode>
implements FlinkKubeClient.PodCallbackHandler {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class);
    private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d";
    private final Map<ResourceID, KubernetesWorkerNode> workerNodes = new HashMap<ResourceID, KubernetesWorkerNode>();
    private long currentMaxAttemptId = 0L;
    private long currentMaxPodId = 0L;
    private final String clusterId;
    private final FlinkKubeClient kubeClient;
    private final KubernetesResourceManagerConfiguration configuration;
    private final Map<String, WorkerResourceSpec> podWorkerResources;
    private KubernetesWatch podsWatch;

    public KubernetesResourceManager(RpcService rpcService, ResourceID resourceId, Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, FlinkKubeClient kubeClient, KubernetesResourceManagerConfiguration configuration) {
        super(flinkConfig, System.getenv(), rpcService, resourceId, highAvailabilityServices, heartbeatServices, slotManager, clusterPartitionTrackerFactory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup);
        this.clusterId = configuration.getClusterId();
        this.kubeClient = kubeClient;
        this.configuration = configuration;
        this.podWorkerResources = new HashMap<String, WorkerResourceSpec>();
    }

    protected Configuration loadClientConfiguration() {
        return GlobalConfiguration.loadConfiguration();
    }

    protected void initialize() throws ResourceManagerException {
        this.recoverWorkerNodesFromPreviousAttempts();
        this.podsWatch = this.kubeClient.watchPodsAndDoCallback(KubernetesUtils.getTaskManagerLabels(this.clusterId), this);
    }

    public CompletableFuture<Void> onStop() {
        Throwable throwable = null;
        try {
            this.podsWatch.close();
        }
        catch (Throwable t) {
            throwable = t;
        }
        try {
            this.kubeClient.close();
        }
        catch (Throwable t) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)throwable);
        }
        return this.getStopTerminationFutureOrCompletedExceptionally(throwable);
    }

    protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {
        LOG.info("Stopping kubernetes cluster, clusterId: {}, diagnostics: {}", (Object)this.clusterId, (Object)(diagnostics == null ? "" : diagnostics));
        this.kubeClient.stopAndCleanupCluster(this.clusterId);
    }

    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
        LOG.info("Starting new worker with worker resource spec, {}", (Object)workerResourceSpec);
        this.requestKubernetesPod(workerResourceSpec);
        return true;
    }

    protected KubernetesWorkerNode workerStarted(ResourceID resourceID) {
        return this.workerNodes.get(resourceID);
    }

    public boolean stopWorker(KubernetesWorkerNode worker) {
        ResourceID resourceId = worker.getResourceID();
        LOG.info("Stopping Worker {}.", (Object)resourceId);
        this.internalStopPod(resourceId.toString());
        return true;
    }

    @Override
    public void onAdded(List<KubernetesPod> pods) {
        this.runAsync(() -> {
            int duplicatePodNum = 0;
            for (KubernetesPod pod : pods) {
                String podName = pod.getName();
                ResourceID resourceID = new ResourceID(podName);
                if (this.workerNodes.containsKey(resourceID)) {
                    this.log.debug("Ignore TaskManager pod that is already added: {}", (Object)podName);
                    ++duplicatePodNum;
                    continue;
                }
                WorkerResourceSpec workerResourceSpec = (WorkerResourceSpec)Preconditions.checkNotNull((Object)this.podWorkerResources.get(podName), (String)"Unrecognized pod {}. Pods from previous attempt should have already been added.", (Object[])new Object[]{podName});
                int pendingNum = this.getNumRequestedNotAllocatedWorkersFor(workerResourceSpec);
                Preconditions.checkState((pendingNum > 0 ? 1 : 0) != 0, (Object)"Should not receive more workers than requested.");
                this.notifyNewWorkerAllocated(workerResourceSpec, resourceID);
                KubernetesWorkerNode worker = new KubernetesWorkerNode(resourceID);
                this.workerNodes.put(resourceID, worker);
                this.log.info("Received new TaskManager pod: {}", (Object)podName);
            }
            this.log.info("Received {} new TaskManager pods. Remaining pending pod requests: {}", (Object)(pods.size() - duplicatePodNum), (Object)this.getNumRequestedNotAllocatedWorkers());
        });
    }

    @Override
    public void onModified(List<KubernetesPod> pods) {
        this.runAsync(() -> pods.forEach(this::removePodAndTryRestartIfRequired));
    }

    @Override
    public void onDeleted(List<KubernetesPod> pods) {
        this.runAsync(() -> pods.forEach(this::removePodAndTryRestartIfRequired));
    }

    @Override
    public void onError(List<KubernetesPod> pods) {
        this.runAsync(() -> pods.forEach(this::removePodAndTryRestartIfRequired));
    }

    @Override
    public void handleFatalError(Throwable throwable) {
        this.onFatalError(throwable);
    }

    @VisibleForTesting
    Map<ResourceID, KubernetesWorkerNode> getWorkerNodes() {
        return this.workerNodes;
    }

    private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
        List<KubernetesPod> podList = this.kubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerLabels(this.clusterId));
        for (KubernetesPod pod : podList) {
            KubernetesWorkerNode worker = new KubernetesWorkerNode(new ResourceID(pod.getName()));
            this.workerNodes.put(worker.getResourceID(), worker);
            long attempt = worker.getAttempt();
            if (attempt <= this.currentMaxAttemptId) continue;
            this.currentMaxAttemptId = attempt;
        }
        this.log.info("Recovered {} pods from previous attempts, current attempt id is {}.", (Object)this.workerNodes.size(), (Object)(++this.currentMaxAttemptId));
    }

    private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) {
        KubernetesTaskManagerParameters parameters = this.createKubernetesTaskManagerParameters(workerResourceSpec);
        this.podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
        int pendingWorkerNum = this.notifyNewWorkerRequested(workerResourceSpec).getNumNotAllocated();
        this.log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", new Object[]{parameters.getTaskManagerMemoryMB(), parameters.getTaskManagerCPU(), pendingWorkerNum});
        KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters);
        this.kubeClient.createTaskManagerPod(taskManagerPod).whenCompleteAsync((ignore, throwable) -> {
            if (throwable != null) {
                Time retryInterval = this.configuration.getPodCreationRetryInterval();
                this.log.warn("Could not start TaskManager in pod {}, retry in {}. ", new Object[]{taskManagerPod.getName(), retryInterval, throwable});
                this.podWorkerResources.remove(parameters.getPodName());
                this.notifyNewWorkerAllocationFailed(workerResourceSpec);
                this.scheduleRunAsync(this::requestKubernetesPodIfRequired, retryInterval);
            } else {
                this.log.info("TaskManager {} will be started with {}.", (Object)parameters.getPodName(), (Object)workerResourceSpec);
            }
        }, (Executor)this.getMainThreadExecutor());
    }

    private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
        TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec((Configuration)this.flinkConfig, (WorkerResourceSpec)workerResourceSpec);
        String podName = String.format(TASK_MANAGER_POD_FORMAT, this.clusterId, this.currentMaxAttemptId, ++this.currentMaxPodId);
        ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create((Configuration)this.flinkConfig, (TaskExecutorProcessSpec)taskExecutorProcessSpec);
        String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString((Configuration)this.flinkClientConfig, (Configuration)this.flinkConfig);
        return new KubernetesTaskManagerParameters(this.flinkConfig, podName, dynamicProperties, taskManagerParameters, ExternalResourceUtils.getExternalResources((Configuration)this.flinkConfig, (String)"kubernetes.config-key"));
    }

    private void requestKubernetesPodIfRequired() {
        for (Map.Entry entry : this.getRequiredResources().entrySet()) {
            WorkerResourceSpec workerResourceSpec = (WorkerResourceSpec)entry.getKey();
            int requiredTaskManagers = (Integer)entry.getValue();
            while (requiredTaskManagers > this.getNumRequestedNotRegisteredWorkersFor(workerResourceSpec)) {
                this.requestKubernetesPod(workerResourceSpec);
            }
        }
    }

    private void removePodAndTryRestartIfRequired(KubernetesPod pod) {
        if (pod.isTerminated()) {
            this.internalStopPod(pod.getName());
            this.requestKubernetesPodIfRequired();
        }
    }

    private void internalStopPod(String podName) {
        ResourceID resourceId = new ResourceID(podName);
        boolean isPendingWorkerOfCurrentAttempt = this.isPendingWorkerOfCurrentAttempt(podName);
        this.kubeClient.stopPod(podName).whenComplete((ignore, throwable) -> {
            if (throwable != null) {
                this.log.warn("Could not stop TaskManager in pod {}.", (Object)podName, throwable);
            }
        });
        WorkerResourceSpec workerResourceSpec = this.podWorkerResources.remove(podName);
        this.workerNodes.remove(resourceId);
        if (isPendingWorkerOfCurrentAttempt) {
            this.notifyNewWorkerAllocationFailed((WorkerResourceSpec)Preconditions.checkNotNull((Object)workerResourceSpec, (String)"Worker resource spec of current attempt pending worker should be known."));
        } else {
            this.notifyAllocatedWorkerStopped(resourceId);
        }
    }

    private boolean isPendingWorkerOfCurrentAttempt(String podName) {
        return this.podWorkerResources.containsKey(podName) && !this.workerNodes.containsKey(new ResourceID(podName));
    }
}

