/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adapter;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionVertex;
import org.apache.flink.runtime.scheduler.adapter.DefaultResultPartition;
import org.apache.flink.runtime.scheduler.adapter.DefaultSchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultExecutionTopologyTest
extends TestLogger {
    private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
    private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
    private ExecutionGraph executionGraph;
    private DefaultExecutionTopology adapter;

    @Before
    public void setUp() throws Exception {
        JobVertex[] jobVertices = new JobVertex[2];
        int parallelism = 3;
        jobVertices[0] = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        jobVertices[1] = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        jobVertices[1].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertices[0].setInputDependencyConstraint(InputDependencyConstraint.ALL);
        jobVertices[1].setInputDependencyConstraint(InputDependencyConstraint.ANY);
        this.executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(this.taskManagerGateway, (RestartStrategy)this.triggeredRestartStrategy, jobVertices);
        this.adapter = new DefaultExecutionTopology(this.executionGraph);
    }

    @Test
    public void testConstructor() {
        DefaultExecutionTopologyTest.assertGraphEquals(this.executionGraph, this.adapter);
    }

    @Test
    public void testGetResultPartition() {
        for (ExecutionVertex vertex : this.executionGraph.getAllExecutionVertices()) {
            for (Map.Entry entry : vertex.getProducedPartitions().entrySet()) {
                IntermediateResultPartition partition = (IntermediateResultPartition)entry.getValue();
                DefaultResultPartition schedulingResultPartition = this.adapter.getResultPartition((IntermediateResultPartitionID)entry.getKey());
                DefaultExecutionTopologyTest.assertPartitionEquals(partition, schedulingResultPartition);
            }
        }
    }

    @Test
    public void testResultPartitionStateSupplier() {
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition)IterableUtils.toStream((Iterable)this.executionGraph.getAllExecutionVertices()).flatMap(v -> v.getProducedPartitions().values().stream()).findAny().get();
        DefaultResultPartition schedulingResultPartition = this.adapter.getResultPartition(intermediateResultPartition.getPartitionId());
        Assert.assertEquals((Object)ResultPartitionState.CREATED, (Object)schedulingResultPartition.getState());
        intermediateResultPartition.markDataProduced();
        Assert.assertEquals((Object)ResultPartitionState.CONSUMABLE, (Object)schedulingResultPartition.getState());
    }

    @Test
    public void testGetVertexOrThrow() {
        try {
            this.adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0));
            Assert.fail((String)"get not exist vertex");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testResultPartitionOrThrow() {
        try {
            this.adapter.getResultPartition(new IntermediateResultPartitionID());
            Assert.fail((String)"get not exist result partition");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testWithCoLocationConstraints() throws Exception {
        ExecutionGraph executionGraph = this.createExecutionGraphWithCoLocationConstraint();
        this.adapter = new DefaultExecutionTopology(executionGraph);
        TestCase.assertTrue((boolean)this.adapter.containsCoLocationConstraints());
    }

    @Test
    public void testWithoutCoLocationConstraints() {
        Assert.assertFalse((boolean)this.adapter.containsCoLocationConstraints());
    }

    @Test
    public void testGetAllPipelinedRegions() {
        Iterable allPipelinedRegions = this.adapter.getAllPipelinedRegions();
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)allPipelinedRegions));
    }

    @Test
    public void testGetPipelinedRegionOfVertex() {
        for (DefaultExecutionVertex vertex : this.adapter.getVertices()) {
            DefaultSchedulingPipelinedRegion pipelinedRegion = this.adapter.getPipelinedRegionOfVertex(vertex.getId());
            this.assertRegionContainsAllVertices(pipelinedRegion);
        }
    }

    private void assertRegionContainsAllVertices(DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex) {
        HashSet allVertices = Sets.newHashSet((Iterable)pipelinedRegionOfVertex.getVertices());
        Assert.assertEquals((Object)Sets.newHashSet((Iterable)this.adapter.getVertices()), (Object)allVertices);
    }

    private ExecutionGraph createExecutionGraphWithCoLocationConstraint() throws Exception {
        JobVertex[] jobVertices = new JobVertex[2];
        int parallelism = 3;
        jobVertices[0] = ExecutionGraphTestUtils.createNoOpVertex("v1", parallelism);
        jobVertices[1] = ExecutionGraphTestUtils.createNoOpVertex("v2", parallelism);
        jobVertices[1].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertices[0].setSlotSharingGroup(slotSharingGroup);
        jobVertices[1].setSlotSharingGroup(slotSharingGroup);
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        coLocationGroup.addVertex(jobVertices[0]);
        coLocationGroup.addVertex(jobVertices[1]);
        jobVertices[0].updateCoLocationGroup(coLocationGroup);
        jobVertices[1].updateCoLocationGroup(coLocationGroup);
        return ExecutionGraphTestUtils.createSimpleTestGraph(this.taskManagerGateway, (RestartStrategy)this.triggeredRestartStrategy, jobVertices);
    }

    private static void assertGraphEquals(ExecutionGraph originalGraph, DefaultExecutionTopology adaptedTopology) {
        Iterator originalVertices = originalGraph.getAllExecutionVertices().iterator();
        Iterator adaptedVertices = adaptedTopology.getVertices().iterator();
        while (originalVertices.hasNext()) {
            ExecutionVertex originalVertex = (ExecutionVertex)originalVertices.next();
            DefaultExecutionVertex adaptedVertex = (DefaultExecutionVertex)adaptedVertices.next();
            DefaultExecutionTopologyTest.assertVertexEquals(originalVertex, adaptedVertex);
            List<IntermediateResultPartition> originalConsumedPartitions = IntStream.range(0, originalVertex.getNumberOfInputs()).mapToObj(arg_0 -> ((ExecutionVertex)originalVertex).getInputEdges(arg_0)).flatMap(Arrays::stream).map(ExecutionEdge::getSource).collect(Collectors.toList());
            Iterable adaptedConsumedPartitions = adaptedVertex.getConsumedResults();
            DefaultExecutionTopologyTest.assertPartitionsEquals(originalConsumedPartitions, adaptedConsumedPartitions);
            Collection<IntermediateResultPartition> originalProducedPartitions = originalVertex.getProducedPartitions().values();
            Iterable adaptedProducedPartitions = adaptedVertex.getProducedResults();
            DefaultExecutionTopologyTest.assertPartitionsEquals(originalProducedPartitions, adaptedProducedPartitions);
        }
        Assert.assertFalse((String)"Number of adapted vertices exceeds number of original vertices.", (boolean)adaptedVertices.hasNext());
    }

    private static void assertPartitionsEquals(Iterable<IntermediateResultPartition> originalResultPartitions, Iterable<DefaultResultPartition> adaptedResultPartitions) {
        Assert.assertEquals((long)Iterables.size(originalResultPartitions), (long)Iterables.size(adaptedResultPartitions));
        for (IntermediateResultPartition originalPartition : originalResultPartitions) {
            DefaultResultPartition adaptedPartition = IterableUtils.toStream(adaptedResultPartitions).filter(adapted -> adapted.getId().equals((Object)originalPartition.getPartitionId())).findAny().orElseThrow(() -> new AssertionError((Object)("Could not find matching adapted partition for " + originalPartition)));
            DefaultExecutionTopologyTest.assertPartitionEquals(originalPartition, adaptedPartition);
            List originalConsumers = originalPartition.getConsumers().stream().flatMap(Collection::stream).map(ExecutionEdge::getTarget).collect(Collectors.toList());
            Iterable adaptedConsumers = adaptedPartition.getConsumers();
            for (ExecutionVertex originalConsumer : originalConsumers) {
                ExecutionVertexID originalId = originalConsumer.getID();
                TestCase.assertTrue((boolean)IterableUtils.toStream((Iterable)adaptedConsumers).anyMatch(adaptedConsumer -> adaptedConsumer.getId().equals((Object)originalId)));
            }
        }
    }

    private static void assertPartitionEquals(IntermediateResultPartition originalPartition, DefaultResultPartition adaptedPartition) {
        Assert.assertEquals((Object)originalPartition.getPartitionId(), (Object)adaptedPartition.getId());
        Assert.assertEquals((Object)originalPartition.getIntermediateResult().getId(), (Object)adaptedPartition.getResultId());
        Assert.assertEquals((Object)originalPartition.getResultType(), (Object)adaptedPartition.getResultType());
        DefaultExecutionTopologyTest.assertVertexEquals(originalPartition.getProducer(), adaptedPartition.getProducer());
    }

    private static void assertVertexEquals(ExecutionVertex originalVertex, DefaultExecutionVertex adaptedVertex) {
        Assert.assertEquals((Object)originalVertex.getID(), (Object)adaptedVertex.getId());
        Assert.assertEquals((Object)originalVertex.getInputDependencyConstraint(), (Object)adaptedVertex.getInputDependencyConstraint());
    }
}

