/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStats;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackPressureRequestCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(BackPressureRequestCoordinator.class);
    private static final int NUM_GHOST_REQUEST_IDS = 10;
    private final Object lock = new Object();
    private final Executor executor;
    private final Time requestTimeout;
    @GuardedBy(value="lock")
    private final Map<Integer, PendingBackPressureRequest> pendingRequests = new HashMap<Integer, PendingBackPressureRequest>();
    private final ArrayDeque<Integer> recentPendingRequests = new ArrayDeque(10);
    @GuardedBy(value="lock")
    private int requestIdCounter;
    @GuardedBy(value="lock")
    private boolean isShutDown;

    public BackPressureRequestCoordinator(Executor executor, long requestTimeout) {
        Preconditions.checkArgument((requestTimeout >= 0L ? 1 : 0) != 0, (Object)"The request timeout must be non-negative.");
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.requestTimeout = Time.milliseconds((long)requestTimeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<BackPressureStats> triggerBackPressureRequest(ExecutionVertex[] tasks) {
        Preconditions.checkNotNull((Object)tasks, (String)"Tasks to request must not be null.");
        Preconditions.checkArgument((tasks.length >= 1 ? 1 : 0) != 0, (Object)"No tasks to request.");
        ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasks.length];
        Execution[] executions = new Execution[tasks.length];
        for (int i = 0; i < triggerIds.length; ++i) {
            Execution execution = tasks[i].getCurrentExecutionAttempt();
            if (execution == null || execution.getState() != ExecutionState.RUNNING) {
                return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasks[i].getTaskNameWithSubtaskIndex() + " is not running."));
            }
            executions[i] = execution;
            triggerIds[i] = execution.getAttemptId();
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutDown) {
                return FutureUtils.completedExceptionally(new IllegalStateException("Shut down."));
            }
            int requestId = this.requestIdCounter++;
            LOG.debug("Triggering task back pressure request {}.", (Object)requestId);
            PendingBackPressureRequest pending = new PendingBackPressureRequest(requestId, triggerIds);
            this.pendingRequests.put(requestId, pending);
            this.requestBackPressure(executions, requestId);
            return pending.getBackPressureStatsFuture();
        }
    }

    private void requestBackPressure(Execution[] executions, int requestId) {
        assert (Thread.holdsLock(this.lock));
        for (Execution execution : executions) {
            CompletableFuture<TaskBackPressureResponse> taskBackPressureFuture = execution.requestBackPressure(requestId, this.requestTimeout);
            taskBackPressureFuture.handleAsync((taskBackPressureResponse, throwable) -> {
                if (taskBackPressureResponse != null) {
                    this.handleSuccessfulTaskBackPressureResponse((TaskBackPressureResponse)taskBackPressureResponse);
                } else {
                    this.handleFailedTaskBackPressureResponse(requestId, (Throwable)throwable);
                }
                return null;
            }, this.executor);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleFailedTaskBackPressureResponse(int requestId, @Nullable Throwable cause) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutDown) {
                return;
            }
            PendingBackPressureRequest pendingRequest = this.pendingRequests.remove(requestId);
            if (pendingRequest != null) {
                if (cause != null) {
                    LOG.info(String.format("Cancelling back pressure request %d.", requestId), cause);
                } else {
                    LOG.info("Cancelling back pressure request {}.", (Object)requestId);
                }
                pendingRequest.discard(cause);
                this.rememberRecentRequestId(requestId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutDown() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.isShutDown) {
                LOG.info("Shutting down back pressure request coordinator.");
                for (PendingBackPressureRequest pending : this.pendingRequests.values()) {
                    pending.discard(new RuntimeException("Shut down."));
                }
                this.pendingRequests.clear();
                this.recentPendingRequests.clear();
                this.isShutDown = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSuccessfulTaskBackPressureResponse(TaskBackPressureResponse taskBackPressureResponse) {
        int requestId = taskBackPressureResponse.getRequestId();
        ExecutionAttemptID executionId = taskBackPressureResponse.getExecutionAttemptID();
        double taskBackPressureRatio = taskBackPressureResponse.getBackPressureRatio();
        Object object = this.lock;
        synchronized (object) {
            PendingBackPressureRequest pending;
            if (this.isShutDown) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Collecting back pressure response of request {} from task {}.", (Object)requestId, (Object)executionId);
            }
            if ((pending = this.pendingRequests.get(requestId)) != null) {
                pending.collectBackPressureStats(executionId, taskBackPressureRatio);
                if (pending.isComplete()) {
                    this.pendingRequests.remove(requestId);
                    this.rememberRecentRequestId(requestId);
                    pending.completePromiseAndDiscard();
                }
            } else if (this.recentPendingRequests.contains(requestId)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received late back pressure request {} result of task {}.", (Object)requestId, (Object)executionId);
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Unknown request ID %d.", requestId));
            }
        }
    }

    private void rememberRecentRequestId(int requestId) {
        if (this.recentPendingRequests.size() >= 10) {
            this.recentPendingRequests.removeFirst();
        }
        this.recentPendingRequests.addLast(requestId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getNumberOfPendingRequests() {
        Object object = this.lock;
        synchronized (object) {
            return this.pendingRequests.size();
        }
    }

    private static class PendingBackPressureRequest {
        private final int requestId;
        private final long startTime;
        private final Set<ExecutionAttemptID> pendingTasks;
        private final Map<ExecutionAttemptID, Double> backPressureRatios;
        private final CompletableFuture<BackPressureStats> backPressureStatsFuture;
        private boolean isDiscarded;

        PendingBackPressureRequest(int requestId, ExecutionAttemptID[] tasksToCollect) {
            this.requestId = requestId;
            this.startTime = System.currentTimeMillis();
            this.pendingTasks = new HashSet<ExecutionAttemptID>(Arrays.asList(tasksToCollect));
            this.backPressureRatios = Maps.newHashMapWithExpectedSize((int)tasksToCollect.length);
            this.backPressureStatsFuture = new CompletableFuture();
        }

        private boolean isComplete() {
            this.checkDiscarded();
            return this.pendingTasks.isEmpty();
        }

        private void discard(Throwable cause) {
            if (!this.isDiscarded) {
                this.pendingTasks.clear();
                this.backPressureRatios.clear();
                this.backPressureStatsFuture.completeExceptionally(new RuntimeException("Discarded.", cause));
                this.isDiscarded = true;
            }
        }

        private void collectBackPressureStats(ExecutionAttemptID executionId, double backPressureRatio) {
            this.checkDiscarded();
            this.checkCompleted();
            if (!this.pendingTasks.remove((Object)executionId)) {
                throw new IllegalArgumentException(String.format("Unknown task %s.", new Object[]{executionId}));
            }
            this.backPressureRatios.put(executionId, backPressureRatio);
        }

        private void completePromiseAndDiscard() {
            this.isDiscarded = true;
            long endTime = System.currentTimeMillis();
            BackPressureStats backPressureStats = new BackPressureStats(this.requestId, this.startTime, endTime, this.backPressureRatios);
            this.backPressureStatsFuture.complete(backPressureStats);
        }

        private CompletableFuture<BackPressureStats> getBackPressureStatsFuture() {
            return this.backPressureStatsFuture;
        }

        private void checkCompleted() {
            if (this.pendingTasks.isEmpty()) {
                throw new IllegalStateException("Completed.");
            }
        }

        private void checkDiscarded() {
            if (this.isDiscarded) {
                throw new IllegalStateException("Discarded.");
            }
        }
    }
}

