/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LazyFromSourcesSchedulingStrategyTest
extends TestLogger {
    private TestingSchedulerOperations testingSchedulerOperation;

    @Before
    public void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
    }

    @Test
    public void testStartScheduling() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().finish();
        testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        this.startScheduling(testingSchedulingTopology);
        this.assertLatestScheduledVerticesAreEqualTo(producers);
    }

    @Test
    public void testRestartBlockingTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().finish();
        testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        Set verticesToRestart = producers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        verticesToRestart.addAll(consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()));
        schedulingStrategy.restartTasks(verticesToRestart);
        this.assertLatestScheduledVerticesAreEqualTo(producers);
    }

    @Test
    public void testRestartConsumableBlockingTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().finish();
        testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        Set verticesToRestart = consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        for (TestingSchedulingExecutionVertex producer : producers) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        schedulingStrategy.restartTasks(verticesToRestart);
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testRestartBlockingALLExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers1 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> producers2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).withInputDependencyConstraint(InputDependencyConstraint.ALL).finish();
        testingSchedulingTopology.connectPointwise(producers1, consumers).finish();
        testingSchedulingTopology.connectPointwise(producers2, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        for (TestingSchedulingExecutionVertex producer : producers1) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        for (TestingSchedulingExecutionVertex producer : producers2) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        Set verticesToRestart = consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        schedulingStrategy.restartTasks(verticesToRestart);
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testRestartBlockingANYExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers1 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> producers2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectPointwise(producers1, consumers).finish();
        testingSchedulingTopology.connectPointwise(producers2, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        for (TestingSchedulingExecutionVertex producer : producers1) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        Set verticesToRestart = consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        schedulingStrategy.restartTasks(verticesToRestart);
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testRestartConsumablePipelinedTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(producers, consumers).withResultPartitionState(ResultPartitionState.CONSUMABLE).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        Set verticesToRestart = producers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        verticesToRestart.addAll(consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toList()));
        schedulingStrategy.restartTasks(verticesToRestart);
        ArrayList<TestingSchedulingExecutionVertex> toScheduleVertices = new ArrayList<TestingSchedulingExecutionVertex>(producers.size() + consumers.size());
        toScheduleVertices.addAll(producers);
        toScheduleVertices.addAll(consumers);
        this.assertLatestScheduledVerticesAreEqualTo(toScheduleVertices);
    }

    @Test
    public void testRestartCreatedPipelinedTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(producers, consumers).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        Set verticesToRestart = producers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        verticesToRestart.addAll(consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()));
        schedulingStrategy.restartTasks(verticesToRestart);
        this.assertLatestScheduledVerticesAreEqualTo(producers);
    }

    @Test
    public void testPipelinedPartitionConsumable() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(producers, consumers).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        TestingSchedulingExecutionVertex producer1 = producers.get(0);
        TestingSchedulingResultPartition partition1 = producer1.getProducedResults().iterator().next();
        schedulingStrategy.onExecutionStateChange(producer1.getId(), ExecutionState.RUNNING);
        schedulingStrategy.onPartitionConsumable(partition1.getId());
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testBlockingPointwiseExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).withInputDependencyConstraint(InputDependencyConstraint.ALL).finish();
        testingSchedulingTopology.connectPointwise(producers, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        for (TestingSchedulingExecutionVertex producer : producers) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testBlockingALLExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers1 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> producers2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).withInputDependencyConstraint(InputDependencyConstraint.ALL).finish();
        testingSchedulingTopology.connectPointwise(producers1, consumers).finish();
        testingSchedulingTopology.connectPointwise(producers2, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        for (TestingSchedulingExecutionVertex producer : producers1) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        for (TestingSchedulingExecutionVertex producer : producers2) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testBlockingANYExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers1 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> producers2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectPointwise(producers1, consumers).finish();
        testingSchedulingTopology.connectPointwise(producers2, consumers).finish();
        LazyFromSourcesSchedulingStrategy schedulingStrategy = this.startScheduling(testingSchedulingTopology);
        for (TestingSchedulingExecutionVertex producer : producers1) {
            schedulingStrategy.onExecutionStateChange(producer.getId(), ExecutionState.FINISHED);
        }
        this.assertLatestScheduledVerticesAreEqualTo(consumers);
    }

    @Test
    public void testOnlyCreatedVertexWillBeScheduled() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = testingSchedulingTopology.newExecutionVertex();
        final TestingSchedulingExecutionVertex v2 = testingSchedulingTopology.newExecutionVertex();
        this.testingSchedulerOperation = new TestingSchedulerOperations(){

            @Override
            public void allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
                super.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
                v2.setState(ExecutionState.CANCELED);
            }
        };
        this.startScheduling(testingSchedulingTopology);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), (Matcher)Matchers.hasSize((int)1));
    }

    private LazyFromSourcesSchedulingStrategy startScheduling(TestingSchedulingTopology testingSchedulingTopology) {
        LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy((SchedulerOperations)this.testingSchedulerOperation, (SchedulingTopology)testingSchedulingTopology);
        schedulingStrategy.startScheduling();
        return schedulingStrategy;
    }

    private void assertLatestScheduledVerticesAreEqualTo(List<TestingSchedulingExecutionVertex> expected) {
        List<List<ExecutionVertexDeploymentOption>> deploymentOptions = this.testingSchedulerOperation.getScheduledVertices();
        Assert.assertThat((Object)expected.size(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(deploymentOptions.size())));
        for (int i = 0; i < expected.size(); ++i) {
            Assert.assertEquals(LazyFromSourcesSchedulingStrategyTest.idsFromVertices(Collections.singletonList(expected.get(expected.size() - i - 1))), LazyFromSourcesSchedulingStrategyTest.idsFromDeploymentOptions(deploymentOptions.get(deploymentOptions.size() - i - 1)));
        }
    }

    private static List<ExecutionVertexID> idsFromVertices(List<TestingSchedulingExecutionVertex> vertices) {
        return vertices.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toList());
    }

    private static List<ExecutionVertexID> idsFromDeploymentOptions(List<ExecutionVertexDeploymentOption> deploymentOptions) {
        return deploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList());
    }
}

