/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceFactory;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceImpl;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final StreamTaskActionExecutor actionExecutor;
    @Nullable
    protected StreamInputProcessor inputProcessor;
    protected OP headOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    protected final StreamConfig configuration;
    protected final StateBackend stateBackend;
    private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;
    protected final TimerService timerService;
    private final CloseableRegistry cancelables = new CloseableRegistry();
    private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private volatile boolean failing;
    private boolean disposedOperators;
    private final ExecutorService asyncOperationsThreadPool;
    private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
    protected final MailboxProcessor mailboxProcessor;
    final MailboxExecutor mainMailboxExecutor;
    private final ExecutorService channelIOExecutor;
    private Long syncSavepointId = null;
    private long latestAsyncCheckpointStartDelayNanos;

    protected StreamTask(Environment env) throws Exception {
        this(env, null);
    }

    protected StreamTask(Environment env, @Nullable TimerService timerService) throws Exception {
        this(env, timerService, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, StreamTaskActionExecutor.IMMEDIATE);
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, actionExecutor, new TaskMailboxImpl(Thread.currentThread()));
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception {
        super(environment);
        this.configuration = new StreamConfig(this.getTaskConfiguration());
        this.recordWriter = StreamTask.createRecordWriterDelegate(this.configuration, environment);
        this.actionExecutor = (StreamTaskActionExecutor)Preconditions.checkNotNull((Object)actionExecutor);
        this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
        this.mailboxProcessor.initMetric(environment.getMetricGroup());
        this.mainMailboxExecutor = this.mailboxProcessor.getMainMailboxExecutor();
        this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
        this.asyncOperationsThreadPool = Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
        this.stateBackend = this.createStateBackend();
        this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl((CheckpointStorageWorkerView)this.stateBackend.createCheckpointStorage(this.getEnvironment().getJobID()), this.getName(), actionExecutor, this.getCancelables(), this.getAsyncOperationsThreadPool(), this.getEnvironment(), this, this.configuration.isUnalignedCheckpointsEnabled(), (BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException>)((BiFunctionWithException)this::prepareInputSnapshot));
        if (timerService == null) {
            DispatcherThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + this.getName());
            this.timerService = new SystemProcessingTimeService(this::handleTimerException, (ThreadFactory)timerThreadFactory);
        } else {
            this.timerService = timerService;
        }
        this.channelIOExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory("channel-state-unspilling"));
    }

    private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
        if (this.inputProcessor == null) {
            return FutureUtils.completedVoidFuture();
        }
        return this.inputProcessor.prepareSnapshot(channelStateWriter, checkpointId);
    }

    SubtaskCheckpointCoordinator getCheckpointCoordinator() {
        return this.subtaskCheckpointCoordinator;
    }

    protected abstract void init() throws Exception;

    protected void cancelTask() throws Exception {
    }

    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.close();
        }
    }

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        InputStatus status = this.inputProcessor.processInput();
        if (status == InputStatus.MORE_AVAILABLE && this.recordWriter.isAvailable()) {
            return;
        }
        if (status == InputStatus.END_OF_INPUT) {
            controller.allActionsCompleted();
            return;
        }
        CompletableFuture<?> jointFuture = this.getInputOutputJointFuture(status);
        MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
        jointFuture.thenRun(suspendedDefaultAction::resume);
    }

    @VisibleForTesting
    CompletableFuture<?> getInputOutputJointFuture(InputStatus status) {
        if (status == InputStatus.NOTHING_AVAILABLE && !this.recordWriter.isAvailable()) {
            return CompletableFuture.allOf(this.inputProcessor.getAvailableFuture(), this.recordWriter.getAvailableFuture());
        }
        if (status == InputStatus.NOTHING_AVAILABLE) {
            return this.inputProcessor.getAvailableFuture();
        }
        return this.recordWriter.getAvailableFuture();
    }

    private void resetSynchronousSavepointId() {
        this.syncSavepointId = null;
    }

    private void setSynchronousSavepointId(long checkpointId) {
        Preconditions.checkState((this.syncSavepointId == null ? 1 : 0) != 0, (Object)"at most one stop-with-savepoint checkpoint at a time is allowed");
        this.syncSavepointId = checkpointId;
    }

    @VisibleForTesting
    OptionalLong getSynchronousSavepointId() {
        return this.syncSavepointId != null ? OptionalLong.of(this.syncSavepointId) : OptionalLong.empty();
    }

    private boolean isSynchronousSavepointId(long checkpointId) {
        return this.syncSavepointId != null && this.syncSavepointId == checkpointId;
    }

    private void runSynchronousSavepointMailboxLoop() throws Exception {
        assert (this.syncSavepointId != null);
        MailboxExecutor mailboxExecutor = this.mailboxProcessor.getMailboxExecutor(Integer.MAX_VALUE);
        while (!this.canceled && this.syncSavepointId != null) {
            mailboxExecutor.yield();
        }
    }

    protected void advanceToEndOfEventTime() throws Exception {
    }

    protected void finishTask() throws Exception {
    }

    public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        return new StreamTaskStateInitializerImpl(this.getEnvironment(), this.stateBackend);
    }

    protected Counter setupNumRecordsInCounter(StreamOperator streamOperator) {
        try {
            return ((OperatorMetricGroup)streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        }
        catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
            return new SimpleCounter();
        }
    }

    protected void beforeInvoke() throws Exception {
        this.disposedOperators = false;
        LOG.debug("Initializing {}.", (Object)this.getName());
        this.operatorChain = new OperatorChain(this, this.recordWriter);
        this.headOperator = this.operatorChain.getHeadOperator();
        this.init();
        if (this.canceled) {
            throw new CancelTaskException();
        }
        LOG.debug("Invoking {}", (Object)this.getName());
        this.actionExecutor.runThrowing(() -> {
            this.operatorChain.initializeStateAndOpenOperators(this.createStreamTaskStateInitializer());
            this.readRecoveredChannelState();
        });
        this.isRunning = true;
    }

    private void readRecoveredChannelState() throws IOException, InterruptedException {
        IndexedInputGate[] inputGates;
        ChannelStateReader reader = this.getEnvironment().getTaskStateManager().getChannelStateReader();
        if (!reader.hasChannelStates()) {
            this.requestPartitions();
            return;
        }
        ResultPartitionWriter[] writers = this.getEnvironment().getAllWriters();
        if (writers != null) {
            for (ResultPartitionWriter writer : writers) {
                writer.readRecoveredState(reader);
            }
        }
        if ((inputGates = this.getEnvironment().getAllInputGates()) != null && inputGates.length > 0) {
            CompletableFuture[] futures = new CompletableFuture[inputGates.length];
            for (int i = 0; i < inputGates.length; ++i) {
                futures[i] = inputGates[i].readRecoveredState(this.channelIOExecutor, reader);
            }
            CompletableFuture.allOf(futures).thenRun(() -> this.mainMailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)this::requestPartitions), "Input gates request partitions"));
        }
    }

    private void requestPartitions() throws IOException {
        IndexedInputGate[] inputGates = this.getEnvironment().getAllInputGates();
        if (inputGates != null) {
            for (IndexedInputGate inputGate : inputGates) {
                inputGate.requestPartitions();
            }
        }
    }

    public final void invoke() throws Exception {
        try {
            this.beforeInvoke();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.runMailboxLoop();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.afterInvoke();
        }
        catch (Throwable invokeException) {
            this.failing = !this.canceled;
            try {
                this.cleanUpInvoke();
            }
            catch (Throwable cleanUpException) {
                Throwable throwable = ExceptionUtils.firstOrSuppressed((Throwable)cleanUpException, (Throwable)invokeException);
                ExceptionUtils.rethrowException((Throwable)throwable);
            }
            ExceptionUtils.rethrowException((Throwable)invokeException);
        }
        this.cleanUpInvoke();
    }

    @VisibleForTesting
    public boolean runMailboxStep() throws Exception {
        return this.mailboxProcessor.runMailboxStep();
    }

    private void runMailboxLoop() throws Exception {
        this.mailboxProcessor.runMailboxLoop();
    }

    protected void afterInvoke() throws Exception {
        LOG.debug("Finished task {}", (Object)this.getName());
        ((CompletableFuture)this.getCompletionFuture().exceptionally(unused -> null)).join();
        CompletableFuture timersFinishedFuture = new CompletableFuture();
        this.operatorChain.closeOperators(this.actionExecutor);
        this.actionExecutor.runThrowing(() -> {
            FutureUtils.forward(this.timerService.quiesce(), (CompletableFuture)timersFinishedFuture);
            this.mailboxProcessor.prepareClose();
            this.isRunning = false;
        });
        this.mailboxProcessor.drain();
        timersFinishedFuture.get();
        LOG.debug("Closed operators for task {}", (Object)this.getName());
        this.operatorChain.flushOutputs();
        this.disposeAllOperators(false);
        this.disposedOperators = true;
    }

    protected void cleanUpInvoke() throws Exception {
        ((CompletableFuture)this.getCompletionFuture().exceptionally(unused -> null)).join();
        this.isRunning = false;
        this.setShouldInterruptOnCancel(false);
        Thread.interrupted();
        this.tryShutdownTimerService();
        try {
            this.cancelables.close();
            this.shutdownAsyncThreads();
        }
        catch (Throwable t) {
            LOG.error("Could not shut down async checkpoint threads", t);
        }
        try {
            this.cleanup();
        }
        catch (Throwable t) {
            LOG.error("Error during cleanup of stream task", t);
        }
        this.disposeAllOperators(true);
        if (this.operatorChain != null) {
            this.actionExecutor.run(() -> this.operatorChain.releaseOutputs());
        } else {
            this.recordWriter.close();
        }
        try {
            this.channelIOExecutor.shutdown();
        }
        catch (Throwable t) {
            LOG.error("Error during shutdown the channel state unspill executor", t);
        }
        this.mailboxProcessor.close();
    }

    protected CompletableFuture<Void> getCompletionFuture() {
        return FutureUtils.completedVoidFuture();
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            this.cancelTask();
        }
        finally {
            this.getCompletionFuture().whenComplete((unusedResult, unusedError) -> {
                this.mailboxProcessor.allActionsCompleted();
                try {
                    this.cancelables.close();
                }
                catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }
    }

    public MailboxExecutorFactory getMailboxExecutorFactory() {
        return this.mailboxProcessor::getMailboxExecutor;
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    public final boolean isFailing() {
        return this.failing;
    }

    private void shutdownAsyncThreads() throws Exception {
        if (!this.asyncOperationsThreadPool.isShutdown()) {
            this.asyncOperationsThreadPool.shutdownNow();
        }
    }

    private void disposeAllOperators(boolean logOnlyErrors) throws Exception {
        if (this.operatorChain != null && !this.disposedOperators) {
            for (StreamOperatorWrapper<?, ?> operatorWrapper : this.operatorChain.getAllOperators(true)) {
                Object operator = operatorWrapper.getStreamOperator();
                if (!logOnlyErrors) {
                    operator.dispose();
                    continue;
                }
                try {
                    operator.dispose();
                }
                catch (Exception e) {
                    LOG.error("Error during disposal of stream operator.", (Throwable)e);
                }
            }
            this.disposedOperators = true;
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (!this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    boolean isSerializingTimestamps() {
        TimeCharacteristic tc = this.configuration.getTimeCharacteristic();
        return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
    }

    public final String getName() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    String getTaskNameWithSubtaskAndId() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks() + " (" + this.getEnvironment().getExecutionId() + ')';
    }

    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.subtaskCheckpointCoordinator.getCheckpointStorage();
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.mainMailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
            this.latestAsyncCheckpointStartDelayNanos = 1000000L * Math.max(0L, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
            try {
                result.complete(this.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime));
            }
            catch (Exception ex) {
                result.completeExceptionally(ex);
                throw ex;
            }
        }), "checkpoint %s with %s", checkpointMetaData, checkpointOptions);
        return result;
    }

    private boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
        try {
            CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setAlignmentDurationNanos(0L);
            this.subtaskCheckpointCoordinator.initCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);
            boolean success = this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
            if (!success) {
                this.declineCheckpoint(checkpointMetaData.getCheckpointId());
            }
            return success;
        }
        catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{checkpointMetaData.getCheckpointId(), this.getName(), e});
            return false;
        }
    }

    public <E extends Exception> void executeInTaskThread(ThrowingRunnable<E> runnable, String descriptionFormat, Object ... descriptionArgs) throws E {
        if (this.mailboxProcessor.isMailboxThread()) {
            runnable.run();
        } else {
            this.mainMailboxExecutor.execute(runnable, descriptionFormat, descriptionArgs);
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws IOException {
        try {
            if (this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false) && this.isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) {
                this.runSynchronousSavepointMailboxLoop();
            }
        }
        catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", (Object)this.getName(), (Object)checkpointMetaData.getCheckpointId());
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
        }
    }

    public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
        this.subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, this.operatorChain);
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean advanceToEndOfTime) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), this.getName()});
        if (this.isRunning) {
            this.actionExecutor.runThrowing(() -> {
                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    this.setSynchronousSavepointId(checkpointMetaData.getCheckpointId());
                    if (advanceToEndOfTime) {
                        this.advanceToEndOfEventTime();
                    }
                }
                this.subtaskCheckpointCoordinator.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics, this.operatorChain, this::isCanceled);
            });
            return true;
        }
        this.actionExecutor.runThrowing(() -> {
            CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            this.recordWriter.broadcastEvent((AbstractEvent)message);
        });
        return false;
    }

    protected void declineCheckpoint(long checkpointId) {
        this.getEnvironment().declineCheckpoint(checkpointId, (Throwable)new CheckpointException("Task Name" + this.getName(), CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    }

    public final ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
        return this.notifyCheckpointOperation(() -> this.notifyCheckpointComplete(checkpointId), String.format("checkpoint %d complete", checkpointId));
    }

    public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
        return this.notifyCheckpointOperation(() -> this.subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, this.operatorChain, this::isRunning), String.format("checkpoint %d aborted", checkpointId));
    }

    private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.mailboxProcessor.getMailboxExecutor(Integer.MAX_VALUE).execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
            try {
                runnable.run();
            }
            catch (Exception ex) {
                result.completeExceptionally(ex);
                throw ex;
            }
            result.complete(null);
        }), description);
        return result;
    }

    private void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.subtaskCheckpointCoordinator.notifyCheckpointComplete(checkpointId, this.operatorChain, this::isRunning);
        if (this.isRunning && this.isSynchronousSavepointId(checkpointId)) {
            this.finishTask();
            this.resetSynchronousSavepointId();
        }
    }

    private void tryShutdownTimerService() {
        if (!this.timerService.isTerminated()) {
            try {
                long timeoutMs = this.getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
                if (!this.timerService.shutdownServiceUninterruptible(timeoutMs)) {
                    LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", (Object)timeoutMs);
                }
            }
            catch (Throwable t) {
                LOG.error("Could not shut down timer service", t);
            }
        }
    }

    public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
        try {
            this.mainMailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
                try {
                    this.operatorChain.dispatchOperatorEvent(operator, event);
                }
                catch (Throwable t) {
                    this.mailboxProcessor.reportThrowable(t);
                }
            }), "dispatch operator event");
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    private StateBackend createStateBackend() throws Exception {
        StateBackend fromApplication = this.configuration.getStateBackend(this.getUserCodeClassLoader());
        return StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)fromApplication, (Configuration)this.getEnvironment().getTaskManagerInfo().getConfiguration(), (ClassLoader)this.getUserCodeClassLoader(), (Logger)LOG);
    }

    @VisibleForTesting
    TimerService getTimerService() {
        return this.timerService;
    }

    @VisibleForTesting
    OP getHeadOperator() {
        return this.headOperator;
    }

    @VisibleForTesting
    StreamTaskActionExecutor getActionExecutor() {
        return this.actionExecutor;
    }

    public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
        return mailboxExecutor -> new ProcessingTimeServiceImpl(this.timerService, callback -> this.deferCallbackToMailbox(mailboxExecutor, (ProcessingTimeCallback)callback));
    }

    @Override
    public void handleAsyncException(String message, Throwable exception) {
        if (this.isRunning) {
            this.asyncExceptionHandler.handleAsyncException(message, exception);
        }
    }

    public String toString() {
        return this.getName();
    }

    public final CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    @VisibleForTesting
    public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig configuration, Environment environment) {
        List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites = StreamTask.createRecordWriters(configuration, environment);
        if (recordWrites.size() == 1) {
            return new SingleRecordWriter(recordWrites.get(0));
        }
        if (recordWrites.size() == 0) {
            return new NonRecordWriter();
        }
        return new MultipleRecordWriters(recordWrites);
    }

    private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig configuration, Environment environment) {
        ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
        for (int i = 0; i < outEdgesInOrder.size(); ++i) {
            StreamEdge edge = outEdgesInOrder.get(i);
            recordWriters.add(StreamTask.createRecordWriter(edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout()));
        }
        return recordWriters;
    }

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(StreamEdge edge, int outputIndex, Environment environment, String taskName, long bufferTimeout) {
        int numKeyGroups;
        StreamPartitioner outputPartitioner = null;
        try {
            outputPartitioner = (StreamPartitioner)InstantiationUtil.clone(edge.getPartitioner(), (ClassLoader)environment.getUserClassLoader());
        }
        catch (Exception e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{outputPartitioner, outputIndex, taskName});
        ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
        if (outputPartitioner instanceof ConfigurableStreamPartitioner && 0 < (numKeyGroups = bufferWriter.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner)((Object)outputPartitioner)).configure(numKeyGroups);
        }
        RecordWriter output = new RecordWriterBuilder().setChannelSelector((ChannelSelector)outputPartitioner).setTimeout(bufferTimeout).setTaskName(taskName).build(bufferWriter);
        output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return output;
    }

    private void handleTimerException(Exception ex) {
        this.handleAsyncException("Caught exception while processing timer.", new TimerException(ex));
    }

    @VisibleForTesting
    ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {
        return timestamp -> mailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> this.invokeProcessingTimeCallback(callback, timestamp)), "Timer callback for %s @ %d", callback, timestamp);
    }

    private void invokeProcessingTimeCallback(ProcessingTimeCallback callback, long timestamp) {
        try {
            callback.onProcessingTime(timestamp);
        }
        catch (Throwable t) {
            this.handleAsyncException("Caught exception while processing timer.", new TimerException(t));
        }
    }

    protected long getAsyncCheckpointStartDelayNanos() {
        return this.latestAsyncCheckpointStartDelayNanos;
    }

    static class StreamTaskAsyncExceptionHandler {
        private final Environment environment;

        StreamTaskAsyncExceptionHandler(Environment environment) {
            this.environment = environment;
        }

        void handleAsyncException(String message, Throwable exception) {
            this.environment.failExternally((Throwable)new AsynchronousException(message, exception));
        }
    }
}

