/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class BackPressureStatsTrackerImplTest
extends TestLogger {
    private static final int requestId = 0;
    private static final double backPressureRatio = 0.1;
    private static final ExecutionJobVertex executionJobVertex = BackPressureStatsTrackerImplTest.createExecutionJobVertex();
    private static final ExecutionVertex[] taskVertices = executionJobVertex.getTaskVertices();
    private static final BackPressureStats backPressureStats = BackPressureStatsTrackerImplTest.createBackPressureStats(0, 1L, 0.1);
    private static final int cleanUpInterval = 60000;
    private static final int backPressureStatsRefreshInterval = 60000;
    private static final long timeGap = 60000L;
    @Rule
    public Timeout caseTimeout = new Timeout(10L, TimeUnit.SECONDS);

    @Test
    public void testGetOperatorBackPressureStats() throws Exception {
        this.doInitialRequestAndVerifyResult(this.createBackPressureTracker());
    }

    @Test
    public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        double backPressureRatio2 = 0.2;
        boolean requestId2 = true;
        BackPressureStats backPressureStats2 = BackPressureStatsTrackerImplTest.createBackPressureStats(1, 60000L, 0.2);
        BackPressureStatsTracker tracker = this.createBackPressureTracker(60000, 60000, backPressureStats, backPressureStats2);
        this.doInitialRequestAndVerifyResult(tracker);
        this.checkOperatorBackPressureStats(tracker.getOperatorBackPressureStats(executionJobVertex));
    }

    @Test
    public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        int backPressureStatsRefreshInterval2 = 10;
        long waitingTime = 20L;
        double backPressureRatio2 = 0.2;
        boolean requestId2 = true;
        BackPressureStats backPressureStats2 = BackPressureStatsTrackerImplTest.createBackPressureStats(1, 60000L, 0.2);
        BackPressureStatsTracker tracker = this.createBackPressureTracker(60000, 10, backPressureStats, backPressureStats2);
        this.doInitialRequestAndVerifyResult(tracker);
        Thread.sleep(20L);
        Assert.assertTrue((boolean)tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        this.checkOperatorBackPressureStats(0.2, backPressureStats2, tracker.getOperatorBackPressureStats(executionJobVertex));
    }

    @Test
    public void testShutDown() throws Exception {
        BackPressureStatsTracker tracker = this.createBackPressureTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.shutDown();
        Assert.assertFalse((boolean)tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        Assert.assertFalse((boolean)tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
    }

    @Test
    public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        BackPressureStatsTracker tracker = this.createBackPressureTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.cleanUpOperatorStatsCache();
        this.checkOperatorBackPressureStats(tracker.getOperatorBackPressureStats(executionJobVertex));
    }

    @Test
    public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        int cleanUpInterval2 = 10;
        long waitingTime = 20L;
        BackPressureStatsTracker tracker = this.createBackPressureTracker(10, 60000, backPressureStats);
        this.doInitialRequestAndVerifyResult(tracker);
        Thread.sleep(20L);
        tracker.cleanUpOperatorStatsCache();
        Assert.assertFalse((boolean)tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
    }

    private void doInitialRequestAndVerifyResult(BackPressureStatsTracker tracker) {
        Assert.assertFalse((boolean)tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        this.checkOperatorBackPressureStats(tracker.getOperatorBackPressureStats(executionJobVertex));
    }

    private void checkOperatorBackPressureStats(Optional<OperatorBackPressureStats> optionalStats) {
        this.checkOperatorBackPressureStats(0.1, backPressureStats, optionalStats);
    }

    private void checkOperatorBackPressureStats(double backPressureRatio, BackPressureStats backPressureStats, Optional<OperatorBackPressureStats> optionalStats) {
        Assert.assertTrue((boolean)optionalStats.isPresent());
        OperatorBackPressureStats stats = optionalStats.get();
        Assert.assertEquals((long)backPressureStats.getRequestId(), (long)stats.getRequestId());
        Assert.assertEquals((long)backPressureStats.getEndTime(), (long)stats.getEndTimestamp());
        Assert.assertEquals((long)taskVertices.length, (long)stats.getNumberOfSubTasks());
        for (int i = 0; i < stats.getNumberOfSubTasks(); ++i) {
            Assert.assertEquals((double)backPressureRatio, (double)stats.getBackPressureRatio(i), (double)0.0);
        }
    }

    private BackPressureStatsTracker createBackPressureTracker() {
        return this.createBackPressureTracker(60000, 60000, backPressureStats);
    }

    private BackPressureStatsTracker createBackPressureTracker(int cleanUpInterval, int backPressureStatsRefreshInterval, BackPressureStats ... stats) {
        TestingBackPressureRequestCoordinator coordinator = new TestingBackPressureRequestCoordinator(Runnable::run, 10000L, stats);
        return new BackPressureStatsTrackerImpl((BackPressureRequestCoordinator)coordinator, cleanUpInterval, backPressureStatsRefreshInterval);
    }

    private static BackPressureStats createBackPressureStats(int requestId, long timeGap, double backPressureRatio) {
        long startTime = System.currentTimeMillis();
        long endTime = startTime + timeGap;
        HashMap<ExecutionAttemptID, Double> backPressureRatiosByTask = new HashMap<ExecutionAttemptID, Double>();
        for (ExecutionVertex vertex : taskVertices) {
            backPressureRatiosByTask.put(vertex.getCurrentExecutionAttempt().getAttemptId(), backPressureRatio);
        }
        return new BackPressureStats(requestId, startTime, endTime, backPressureRatiosByTask);
    }

    private static ExecutionJobVertex createExecutionJobVertex() {
        try {
            return ExecutionJobVertexTest.createExecutionJobVertex(4, 4);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.");
        }
    }

    private static class TestingBackPressureRequestCoordinator
    extends BackPressureRequestCoordinator {
        private final BackPressureStats[] backPressureStats;
        private int counter = 0;

        TestingBackPressureRequestCoordinator(Executor executor, long requestTimeout, BackPressureStats ... backPressureStats) {
            super(executor, requestTimeout);
            this.backPressureStats = backPressureStats;
        }

        CompletableFuture<BackPressureStats> triggerBackPressureRequest(ExecutionVertex[] tasks) {
            return CompletableFuture.completedFuture(this.backPressureStats[this.counter++ % this.backPressureStats.length]);
        }
    }
}

