/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class StreamContextEnvironment
extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
    private final boolean suppressSysout;
    private final boolean enforceSingleJobExecution;
    private int jobCounter;

    public StreamContextEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userCodeClassLoader, boolean enforceSingleJobExecution, boolean suppressSysout) {
        super(executorServiceLoader, configuration, userCodeClassLoader);
        this.suppressSysout = suppressSysout;
        this.enforceSingleJobExecution = enforceSingleJobExecution;
        this.jobCounter = 0;
    }

    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        JobClient jobClient = this.executeAsync(streamGraph);
        List jobListeners = this.getJobListeners();
        try {
            JobExecutionResult jobExecutionResult = this.getJobExecutionResult(jobClient);
            jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
            return jobExecutionResult;
        }
        catch (Throwable t) {
            jobListeners.forEach(jobListener -> jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException((Throwable)t)));
            ExceptionUtils.rethrowException((Throwable)t);
            return null;
        }
    }

    private JobExecutionResult getJobExecutionResult(JobClient jobClient) throws Exception {
        DetachedJobExecutionResult jobExecutionResult;
        Preconditions.checkNotNull((Object)jobClient);
        if (this.getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
            CompletableFuture jobExecutionResultFuture = jobClient.getJobExecutionResult(this.getUserClassloader());
            if (this.getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
                Thread shutdownHook = ShutdownHookUtil.addShutdownHook(() -> jobClient.cancel().get(1L, TimeUnit.SECONDS), (String)StreamContextEnvironment.class.getSimpleName(), (Logger)LOG);
                jobExecutionResultFuture.whenComplete((ignored, throwable) -> ShutdownHookUtil.removeShutdownHook((Thread)shutdownHook, (String)StreamContextEnvironment.class.getSimpleName(), (Logger)LOG));
            }
            jobExecutionResult = (JobExecutionResult)jobExecutionResultFuture.get();
            System.out.println(jobExecutionResult);
        } else {
            jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
        }
        return jobExecutionResult;
    }

    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        this.validateAllowedExecution();
        JobClient jobClient = super.executeAsync(streamGraph);
        if (!this.suppressSysout) {
            System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
        }
        return jobClient;
    }

    private void validateAllowedExecution() {
        if (this.enforceSingleJobExecution && this.jobCounter > 0) {
            throw new FlinkRuntimeException("Cannot have more than one execute() or executeAsync() call in a single environment.");
        }
        ++this.jobCounter;
    }

    public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userCodeClassLoader, boolean enforceSingleJobExecution, boolean suppressSysout) {
        StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout);
        StreamContextEnvironment.initializeContextEnvironment((StreamExecutionEnvironmentFactory)factory);
    }

    public static void unsetAsContext() {
        StreamContextEnvironment.resetContextEnvironment();
    }
}

