/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class DefaultExecutionGraphCacheTest
extends TestLogger {
    private static ArchivedExecutionGraph expectedExecutionGraph;
    private static final JobID expectedJobId;

    @BeforeClass
    public static void setup() {
        expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build();
    }

    @Test
    public void testExecutionGraphCaching() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        CountingRestfulGateway restfulGateway = this.createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
        try (DefaultExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            Assert.assertEquals((Object)expectedExecutionGraph, accessExecutionGraphFuture.get());
            accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            Assert.assertEquals((Object)expectedExecutionGraph, accessExecutionGraphFuture.get());
            Assert.assertThat((Object)restfulGateway.getNumRequestJobCalls(), (Matcher)Matchers.equalTo((Object)1));
        }
    }

    @Test
    public void testExecutionGraphEntryInvalidation() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.milliseconds((long)1L);
        CountingRestfulGateway restfulGateway = this.createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph), CompletableFuture.completedFuture(expectedExecutionGraph));
        try (DefaultExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            Assert.assertEquals((Object)expectedExecutionGraph, executionGraphFuture.get());
            Thread.sleep(timeToLive.toMilliseconds() * 5L);
            CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            Assert.assertEquals((Object)expectedExecutionGraph, executionGraphFuture2.get());
            Assert.assertThat((Object)restfulGateway.getNumRequestJobCalls(), (Matcher)Matchers.equalTo((Object)2));
        }
    }

    @Test
    public void testImmediateCacheInvalidationAfterFailure() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        CountingRestfulGateway restfulGateway = this.createCountingRestfulGateway(expectedJobId, FutureUtils.completedExceptionally((Throwable)new FlinkJobNotFoundException(expectedJobId)), CompletableFuture.completedFuture(expectedExecutionGraph));
        try (DefaultExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            try {
                executionGraphFuture.get();
                Assert.fail((String)"The execution graph future should have been completed exceptionally.");
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((boolean)(ee.getCause() instanceof FlinkException));
            }
            CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            Assert.assertEquals((Object)expectedExecutionGraph, executionGraphFuture2.get());
        }
    }

    @Test
    public void testCacheEntryCleanup() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.milliseconds((long)1L);
        JobID expectedJobId2 = new JobID();
        ArchivedExecutionGraph expectedExecutionGraph2 = new ArchivedExecutionGraphBuilder().build();
        AtomicInteger requestJobCalls = new AtomicInteger(0);
        TestingRestfulGateway restfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setRequestJobFunction(jobId -> {
            requestJobCalls.incrementAndGet();
            if (jobId.equals((Object)expectedJobId)) {
                return CompletableFuture.completedFuture(expectedExecutionGraph);
            }
            if (jobId.equals((Object)expectedJobId2)) {
                return CompletableFuture.completedFuture(expectedExecutionGraph2);
            }
            throw new AssertionError((Object)"Invalid job id received.");
        })).build();
        try (DefaultExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive);){
            CompletableFuture executionGraph1Future = executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
            CompletableFuture executionGraph2Future = executionGraphCache.getExecutionGraph(expectedJobId2, (RestfulGateway)restfulGateway);
            Assert.assertEquals((Object)expectedExecutionGraph, executionGraph1Future.get());
            Assert.assertEquals((Object)expectedExecutionGraph2, executionGraph2Future.get());
            Assert.assertThat((Object)requestJobCalls.get(), (Matcher)Matchers.equalTo((Object)2));
            Thread.sleep(timeToLive.toMilliseconds());
            executionGraphCache.cleanup();
            Assert.assertTrue((executionGraphCache.size() == 0 ? 1 : 0) != 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAccess() throws Exception {
        Time timeout = Time.milliseconds((long)100L);
        Time timeToLive = Time.hours((long)1L);
        CountingRestfulGateway restfulGateway = this.createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph));
        int numConcurrentAccesses = 10;
        ArrayList<CompletionStage> executionGraphFutures = new ArrayList<CompletionStage>(10);
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try (DefaultExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive);){
            for (int i = 0; i < 10; ++i) {
                CompletionStage executionGraphFuture = CompletableFuture.supplyAsync(() -> DefaultExecutionGraphCacheTest.lambda$testConcurrentAccess$1((ExecutionGraphCache)executionGraphCache, restfulGateway), executor).thenCompose(Function.identity());
                executionGraphFutures.add(executionGraphFuture);
            }
            FutureUtils.ConjunctFuture allExecutionGraphFutures = FutureUtils.combineAll(executionGraphFutures);
            Collection allExecutionGraphs = (Collection)allExecutionGraphFutures.get();
            for (AccessExecutionGraph executionGraph : allExecutionGraphs) {
                Assert.assertEquals((Object)expectedExecutionGraph, (Object)executionGraph);
            }
            Assert.assertThat((Object)restfulGateway.getNumRequestJobCalls(), (Matcher)Matchers.equalTo((Object)1));
        }
        catch (Throwable throwable) {
            ExecutorUtils.gracefulShutdown((long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{executor});
            throw throwable;
        }
        ExecutorUtils.gracefulShutdown((long)5000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{executor});
    }

    private CountingRestfulGateway createCountingRestfulGateway(JobID jobId, CompletableFuture<ArchivedExecutionGraph> ... accessExecutionGraphs) {
        ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> queue = new ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>>(Arrays.asList(accessExecutionGraphs));
        return new CountingRestfulGateway(jobId, ignored -> (CompletableFuture)queue.poll());
    }

    private static /* synthetic */ CompletableFuture lambda$testConcurrentAccess$1(ExecutionGraphCache executionGraphCache, CountingRestfulGateway restfulGateway) {
        return executionGraphCache.getExecutionGraph(expectedJobId, (RestfulGateway)restfulGateway);
    }

    static {
        expectedJobId = new JobID();
    }

    private static final class SuspendableAccessExecutionGraph
    extends ArchivedExecutionGraph {
        private static final long serialVersionUID = -6796543726305778101L;
        private JobStatus jobStatus = super.getState();

        public SuspendableAccessExecutionGraph(JobID jobId) {
            super(jobId, "DefaultExecutionGraphCacheTest", Collections.emptyMap(), Collections.emptyList(), new long[0], JobStatus.RUNNING, new ErrorInfo((Throwable)new FlinkException("Test"), 42L), "", new StringifiedAccumulatorResult[0], Collections.emptyMap(), new ArchivedExecutionConfig(new ExecutionConfig()), false, null, null, "stateBackendName");
        }

        public JobStatus getState() {
            return this.jobStatus;
        }

        public void setJobStatus(JobStatus jobStatus) {
            this.jobStatus = jobStatus;
        }
    }

    private static class CountingRestfulGateway
    extends TestingRestfulGateway {
        private final JobID expectedJobId;
        private AtomicInteger numRequestJobCalls = new AtomicInteger(0);

        private CountingRestfulGateway(JobID expectedJobId, Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
            this.expectedJobId = (JobID)Preconditions.checkNotNull((Object)expectedJobId);
            this.requestJobFunction = (Function)Preconditions.checkNotNull(requestJobFunction);
        }

        @Override
        public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
            Assert.assertThat((Object)jobId, (Matcher)Matchers.equalTo((Object)this.expectedJobId));
            this.numRequestJobCalls.incrementAndGet();
            return super.requestJob(jobId, timeout);
        }

        public int getNumRequestJobCalls() {
            return this.numRequestJobCalls.get();
        }
    }
}

