/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.kubeclient;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.dsl.EditReplacePatchDeletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.Gettable;
import io.fabric8.kubernetes.client.dsl.ListVisitFromServerGetDeleteRecreateWaitApplicable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Endpoint;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fabric8FlinkKubeClient
implements FlinkKubeClient {
    private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
    private final KubernetesClient internalClient;
    private final String clusterId;
    private final String namespace;
    private final ExecutorService kubeClientExecutorService;

    public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client, Supplier<ExecutorService> asyncExecutorFactory) {
        this.internalClient = (KubernetesClient)Preconditions.checkNotNull((Object)client);
        this.clusterId = (String)Preconditions.checkNotNull((Object)flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
        this.namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
        this.kubeClientExecutorService = asyncExecutorFactory.get();
    }

    @Override
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
        Deployment deployment = kubernetesJMSpec.getDeployment();
        List<HasMetadata> accompanyingResources = kubernetesJMSpec.getAccompanyingResources();
        LOG.debug("Start to create deployment with spec {}", (Object)deployment.getSpec().toString());
        Deployment createdDeployment = (Deployment)((NonNamespaceOperation)this.internalClient.apps().deployments().inNamespace(this.namespace)).create(deployment);
        this.setOwnerReference(createdDeployment, accompanyingResources);
        ((ListVisitFromServerGetDeleteRecreateWaitApplicable)this.internalClient.resourceList(accompanyingResources).inNamespace(this.namespace)).createOrReplace();
    }

    @Override
    public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
        return CompletableFuture.runAsync(() -> {
            Deployment masterDeployment = (Deployment)((RollableScalableResource)((NonNamespaceOperation)this.internalClient.apps().deployments().inNamespace(this.namespace)).withName(KubernetesUtils.getDeploymentName(this.clusterId))).get();
            if (masterDeployment == null) {
                throw new RuntimeException("Failed to find Deployment named " + this.clusterId + " in namespace " + this.namespace);
            }
            this.setOwnerReference(masterDeployment, Collections.singletonList(kubernetesPod.getInternalResource()));
            LOG.debug("Start to create pod with metadata {}, spec {}", (Object)((Pod)kubernetesPod.getInternalResource()).getMetadata(), (Object)((Pod)kubernetesPod.getInternalResource()).getSpec());
            ((NonNamespaceOperation)this.internalClient.pods().inNamespace(this.namespace)).create(kubernetesPod.getInternalResource());
        }, this.kubeClientExecutorService);
    }

    @Override
    public CompletableFuture<Void> stopPod(String podName) {
        return CompletableFuture.runAsync(() -> {
            Boolean cfr_ignored_0 = (Boolean)((PodResource)this.internalClient.pods().withName(podName)).delete();
        }, this.kubeClientExecutorService);
    }

    @Override
    public Optional<Endpoint> getRestEndpoint(String clusterId) {
        Optional<KubernetesService> restService = this.getRestService(clusterId);
        if (!restService.isPresent()) {
            return Optional.empty();
        }
        Service service = (Service)restService.get().getInternalResource();
        int restPort = this.getRestPortFromExternalService(service);
        KubernetesConfigOptions.ServiceExposedType serviceExposedType = KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType());
        if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
            return Optional.of(new Endpoint(ExternalServiceDecorator.getNamespacedExternalServiceName(clusterId, this.namespace), restPort));
        }
        return this.getRestEndPointFromService(service, restPort);
    }

    @Override
    public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
        List<Pod> podList = ((PodList)((FilterWatchListDeletable)this.internalClient.pods().withLabels(labels)).list()).getItems();
        if (podList == null || podList.isEmpty()) {
            return new ArrayList<KubernetesPod>();
        }
        return podList.stream().map(KubernetesPod::new).collect(Collectors.toList());
    }

    @Override
    public void stopAndCleanupCluster(String clusterId) {
        ((EditReplacePatchDeletable)((RollableScalableResource)((NonNamespaceOperation)this.internalClient.apps().deployments().inNamespace(this.namespace)).withName(KubernetesUtils.getDeploymentName(clusterId))).cascading(true)).delete();
    }

    @Override
    public void handleException(Exception e) {
        LOG.error("A Kubernetes exception occurred.", (Throwable)e);
    }

    @Override
    public Optional<KubernetesService> getRestService(String clusterId) {
        String serviceName = ExternalServiceDecorator.getExternalServiceName(clusterId);
        Service service = (Service)((Gettable)((ServiceResource)((NonNamespaceOperation)this.internalClient.services().inNamespace(this.namespace)).withName(serviceName)).fromServer()).get();
        if (service == null) {
            LOG.debug("Service {} does not exist", (Object)serviceName);
            return Optional.empty();
        }
        return Optional.of(new KubernetesService(service));
    }

    @Override
    public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, FlinkKubeClient.PodCallbackHandler podCallbackHandler) {
        return new KubernetesWatch((Watch)((FilterWatchListDeletable)this.internalClient.pods().withLabels(labels)).watch(new KubernetesPodsWatcher(podCallbackHandler)));
    }

    @Override
    public void close() {
        this.internalClient.close();
        ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.kubeClientExecutorService});
    }

    private void setOwnerReference(Deployment deployment, List<HasMetadata> resources) {
        OwnerReference deploymentOwnerReference = ((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)((OwnerReferenceBuilder)new OwnerReferenceBuilder().withName(deployment.getMetadata().getName())).withApiVersion(deployment.getApiVersion())).withUid(deployment.getMetadata().getUid())).withKind(deployment.getKind())).withController(true)).withBlockOwnerDeletion(true)).build();
        resources.forEach(resource -> resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference)));
    }

    private int getRestPortFromExternalService(Service externalService) {
        List servicePortCandidates = externalService.getSpec().getPorts().stream().filter(x -> x.getName().equals("rest")).collect(Collectors.toList());
        if (servicePortCandidates.isEmpty()) {
            throw new RuntimeException("Failed to find port \"rest\" in Service \"" + ExternalServiceDecorator.getExternalServiceName(this.clusterId) + "\"");
        }
        ServicePort externalServicePort = (ServicePort)servicePortCandidates.get(0);
        KubernetesConfigOptions.ServiceExposedType externalServiceType = KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
        switch (externalServiceType) {
            case ClusterIP: 
            case LoadBalancer: {
                return externalServicePort.getPort();
            }
            case NodePort: {
                return externalServicePort.getNodePort();
            }
        }
        throw new RuntimeException("Unrecognized Service type: " + (Object)((Object)externalServiceType));
    }

    private Optional<Endpoint> getRestEndPointFromService(Service service, int restPort) {
        String address;
        boolean hasExternalIP;
        if (service.getStatus() == null) {
            return Optional.empty();
        }
        LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer();
        boolean bl = hasExternalIP = service.getSpec() != null && service.getSpec().getExternalIPs() != null && !service.getSpec().getExternalIPs().isEmpty();
        if (loadBalancer != null) {
            return this.getLoadBalancerRestEndpoint(loadBalancer, restPort);
        }
        if (hasExternalIP && (address = service.getSpec().getExternalIPs().get(0)) != null && !address.isEmpty()) {
            return Optional.of(new Endpoint(address, restPort));
        }
        return Optional.empty();
    }

    private Optional<Endpoint> getLoadBalancerRestEndpoint(LoadBalancerStatus loadBalancer, int restPort) {
        String address;
        boolean hasIngress;
        boolean bl = hasIngress = loadBalancer.getIngress() != null && !loadBalancer.getIngress().isEmpty();
        if (hasIngress) {
            address = loadBalancer.getIngress().get(0).getIp();
            if (address == null || address.isEmpty()) {
                address = loadBalancer.getIngress().get(0).getHostname();
            }
        } else {
            address = this.internalClient.getMasterUrl().getHost();
        }
        boolean noAddress = address == null || address.isEmpty();
        return noAddress ? Optional.empty() : Optional.of(new Endpoint(address, restPort));
    }
}

