/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobAttachmentClientActor;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobRetrievalException;
import org.apache.flink.runtime.client.JobSubmissionClientActor;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.JobTimeoutException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException {
        LOG.info("Starting JobClient actor system");
        Some remoting = new Some((Object)new Tuple2((Object)"", (Object)0));
        ActorSystem system = AkkaUtils.createActorSystem(config, (Option<Tuple2<String, Object>>)remoting);
        Address address = system.provider().getDefaultAddress();
        String hostAddress = address.host().isDefined() ? NetUtils.ipAddressToUrlString((InetAddress)InetAddress.getByName((String)address.host().get())) : "(unknown)";
        int port = address.port().isDefined() ? (Integer)address.port().get() : -1;
        LOG.info("Started JobClient actor system at " + hostAddress + ':' + port);
        return system;
    }

    public static JobListeningContext submitJob(ActorSystem actorSystem, Configuration config, LeaderRetrievalService leaderRetrievalService, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) {
        Preconditions.checkNotNull((Object)actorSystem, (String)"The actorSystem must not be null.");
        Preconditions.checkNotNull((Object)leaderRetrievalService, (String)"The jobManagerGateway must not be null.");
        Preconditions.checkNotNull((Object)jobGraph, (String)"The jobGraph must not be null.");
        Preconditions.checkNotNull((Object)timeout, (String)"The timeout must not be null.");
        Props jobClientActorProps = JobSubmissionClientActor.createActorProps(leaderRetrievalService, timeout, sysoutLogUpdates, config);
        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
        Future submissionFuture = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(jobGraph), (Timeout)new Timeout(AkkaUtils.INF_TIMEOUT()));
        return new JobListeningContext(jobGraph.getJobID(), (Future<Object>)submissionFuture, jobClientActor, timeout, classLoader);
    }

    public static JobListeningContext attachToRunningJob(JobID jobID, ActorGateway jobManagerGateWay, Configuration configuration, ActorSystem actorSystem, LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout, boolean sysoutLogUpdates) {
        Preconditions.checkNotNull((Object)jobID, (String)"The jobID must not be null.");
        Preconditions.checkNotNull((Object)jobManagerGateWay, (String)"The jobManagerGateWay must not be null.");
        Preconditions.checkNotNull((Object)configuration, (String)"The configuration must not be null.");
        Preconditions.checkNotNull((Object)actorSystem, (String)"The actorSystem must not be null.");
        Preconditions.checkNotNull((Object)leaderRetrievalService, (String)"The jobManagerGateway must not be null.");
        Preconditions.checkNotNull((Object)timeout, (String)"The timeout must not be null.");
        Props jobClientActorProps = JobAttachmentClientActor.createActorProps(leaderRetrievalService, timeout, sysoutLogUpdates);
        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
        Future attachmentFuture = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.AttachToJobAndWait(jobID), (Timeout)new Timeout(AkkaUtils.INF_TIMEOUT()));
        return new JobListeningContext(jobID, (Future<Object>)attachmentFuture, jobClientActor, timeout, actorSystem, configuration);
    }

    public static ClassLoader retrieveClassLoader(JobID jobID, ActorGateway jobManager, Configuration config) throws JobRetrievalException {
        Object jmAnswer;
        try {
            jmAnswer = Await.result(jobManager.ask(new JobManagerMessages.RequestClassloadingProps(jobID), AkkaUtils.getDefaultTimeoutAsFiniteDuration()), (Duration)AkkaUtils.getDefaultTimeoutAsFiniteDuration());
        }
        catch (Exception e) {
            throw new JobRetrievalException(jobID, "Couldn't retrieve class loading properties from JobManager.", e);
        }
        if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
            JobManagerMessages.ClassloadingProps props = (JobManagerMessages.ClassloadingProps)jmAnswer;
            Option jmHost = jobManager.actor().path().address().host();
            String jmHostname = jmHost.isDefined() ? (String)jmHost.get() : "localhost";
            InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, (int)props.blobManagerPort());
            BlobCache blobClient = new BlobCache(serverAddress, config);
            Collection<BlobKey> requiredJarFiles = props.requiredJarFiles();
            Collection<URL> requiredClasspaths = props.requiredClasspaths();
            URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
            int pos = 0;
            for (BlobKey blobKey : props.requiredJarFiles()) {
                try {
                    allURLs[pos++] = blobClient.getURL(blobKey);
                }
                catch (Exception e) {
                    blobClient.shutdown();
                    throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey, e);
                }
            }
            for (URL url : requiredClasspaths) {
                allURLs[pos++] = url;
            }
            return new FlinkUserCodeClassLoader(allURLs, JobClient.class.getClassLoader());
        }
        if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
            throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
        }
        throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    }

    public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
        Object answer;
        JobID jobID = listeningContext.getJobID();
        ActorRef jobClientActor = listeningContext.getJobClientActor();
        Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
        FiniteDuration askTimeout = listeningContext.getTimeout();
        ClassLoader classLoader = listeningContext.getClassLoader();
        while (!jobSubmissionFuture.isCompleted()) {
            try {
                Await.ready(jobSubmissionFuture, (Duration)askTimeout);
            }
            catch (InterruptedException e) {
                throw new JobExecutionException(jobID, "Interrupted while waiting for job completion.");
            }
            catch (TimeoutException e) {
                try {
                    Await.result((Awaitable)Patterns.ask((ActorRef)jobClientActor, (Object)new Identify((Object)true), (Timeout)Timeout.durationToTimeout((FiniteDuration)askTimeout)), (Duration)askTimeout);
                }
                catch (Exception eInner) {
                    if (jobSubmissionFuture.isCompleted()) continue;
                    throw new JobExecutionException(jobID, "JobClientActor seems to have died before the JobExecutionResult could be retrieved.", eInner);
                }
            }
        }
        try {
            answer = Await.result(jobSubmissionFuture, (Duration)Duration.Zero());
        }
        catch (Throwable throwable) {
            throw new JobExecutionException(jobID, "Couldn't retrieve the JobExecutionResult from the JobManager.", throwable);
        }
        finally {
            jobClientActor.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        }
        if (answer instanceof JobManagerMessages.JobResultSuccess) {
            LOG.info("Job execution complete");
            SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess)answer).result();
            if (result != null) {
                try {
                    return result.toJobExecutionResult(classLoader);
                }
                catch (Throwable t) {
                    throw new JobExecutionException(jobID, "Job was successfully executed but JobExecutionResult could not be deserialized.");
                }
            }
            throw new JobExecutionException(jobID, "Job was successfully executed but result contained a null JobExecutionResult.");
        }
        if (answer instanceof JobManagerMessages.JobResultFailure) {
            LOG.info("Job execution failed");
            SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure)answer).cause();
            if (serThrowable != null) {
                Throwable cause = serThrowable.deserializeError(classLoader);
                if (cause instanceof JobExecutionException) {
                    throw (JobExecutionException)cause;
                }
                throw new JobExecutionException(jobID, "Job execution failed", cause);
            }
            throw new JobExecutionException(jobID, "Job execution failed with null as failure cause.");
        }
        if (answer instanceof JobManagerMessages.JobNotFound) {
            throw new JobRetrievalException(((JobManagerMessages.JobNotFound)answer).jobID(), "Couldn't retrieve Job " + jobID + " because it was not running.");
        }
        throw new JobExecutionException(jobID, "Unknown answer from JobManager after submitting the job: " + answer);
    }

    public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, Configuration config, LeaderRetrievalService leaderRetrievalService, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) throws JobExecutionException {
        JobListeningContext jobListeningContext = JobClient.submitJob(actorSystem, config, leaderRetrievalService, jobGraph, timeout, sysoutLogUpdates, classLoader);
        return JobClient.awaitJobResult(jobListeningContext);
    }

    public static void submitJobDetached(ActorGateway jobManagerGateway, Configuration config, JobGraph jobGraph, FiniteDuration timeout, ClassLoader classLoader) throws JobExecutionException {
        Object result;
        Preconditions.checkNotNull((Object)jobManagerGateway, (String)"The jobManagerGateway must not be null.");
        Preconditions.checkNotNull((Object)jobGraph, (String)"The jobGraph must not be null.");
        Preconditions.checkNotNull((Object)timeout, (String)"The timeout must not be null.");
        LOG.info("Checking and uploading JAR files");
        try {
            jobGraph.uploadUserJars(jobManagerGateway, timeout, config);
        }
        catch (IOException e) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Could not upload the program's JAR files to the JobManager.", e);
        }
        try {
            Future<Object> future = jobManagerGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), timeout);
            result = Await.result(future, (Duration)timeout);
        }
        catch (TimeoutException e) {
            throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + timeout.toString(), e);
        }
        catch (Throwable t) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + t.getMessage(), t.getCause());
        }
        if (result instanceof JobManagerMessages.JobSubmitSuccess) {
            JobID respondedID = ((JobManagerMessages.JobSubmitSuccess)result).jobId();
            if (!respondedID.equals((Object)jobGraph.getJobID())) {
                throw new JobExecutionException(jobGraph.getJobID(), "JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() + ", response: " + respondedID);
            }
        } else {
            if (result instanceof JobManagerMessages.JobResultFailure) {
                try {
                    SerializedThrowable t = ((JobManagerMessages.JobResultFailure)result).cause();
                    throw t.deserializeError(classLoader);
                }
                catch (JobExecutionException e) {
                    throw e;
                }
                catch (Throwable t) {
                    throw new JobExecutionException(jobGraph.getJobID(), "JobSubmission failed: " + t.getMessage(), t);
                }
            }
            throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
        }
    }
}

