package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.class */
public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
    private TestingSchedulerOperations testingSchedulerOperation;

    @Before
    public void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
    }

    @Test
    public void testStartScheduling() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().finish();
        testingSchedulingTopology.connectAllToAll(finish, testingSchedulingTopology.addExecutionVertices().finish()).finish();
        startScheduling(testingSchedulingTopology);
        assertLatestScheduledVerticesAreEqualTo(finish);
    }

    @Test
    public void testRestartBlockingTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Set set = (Set) finish.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet());
        set.addAll((Collection) finish2.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet()));
        startScheduling.restartTasks(set);
        assertLatestScheduledVerticesAreEqualTo(finish);
    }

    @Test
    public void testRestartConsumableBlockingTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Set set = (Set) finish2.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet());
        Iterator<TestingSchedulingExecutionVertex> it = finish.iterator();
        while (it.hasNext()) {
            startScheduling.onExecutionStateChange(it.next().m447getId(), ExecutionState.FINISHED);
        }
        startScheduling.restartTasks(set);
        assertLatestScheduledVerticesAreEqualTo(finish2);
    }

    @Test
    public void testRestartBlockingALLExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish3 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).withInputDependencyConstraint(InputDependencyConstraint.ALL).finish();
        testingSchedulingTopology.connectPointwise(finish, finish3).finish();
        testingSchedulingTopology.connectPointwise(finish2, finish3).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Iterator<TestingSchedulingExecutionVertex> it = finish.iterator();
        while (it.hasNext()) {
            startScheduling.onExecutionStateChange(it.next().m447getId(), ExecutionState.FINISHED);
        }
        Iterator<TestingSchedulingExecutionVertex> it2 = finish2.iterator();
        while (it2.hasNext()) {
            startScheduling.onExecutionStateChange(it2.next().m447getId(), ExecutionState.FINISHED);
        }
        startScheduling.restartTasks((Set) finish3.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet()));
        assertLatestScheduledVerticesAreEqualTo(finish3);
    }

    @Test
    public void testRestartBlockingANYExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish3 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectPointwise(finish, finish3).finish();
        testingSchedulingTopology.connectPointwise(finish2, finish3).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Iterator<TestingSchedulingExecutionVertex> it = finish.iterator();
        while (it.hasNext()) {
            startScheduling.onExecutionStateChange(it.next().m447getId(), ExecutionState.FINISHED);
        }
        startScheduling.restartTasks((Set) finish3.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet()));
        assertLatestScheduledVerticesAreEqualTo(finish3);
    }

    @Test
    public void testRestartConsumablePipelinedTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.CONSUMABLE).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Set set = (Set) finish.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet());
        set.addAll((Collection) finish2.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toList()));
        startScheduling.restartTasks(set);
        ArrayList arrayList = new ArrayList(finish.size() + finish2.size());
        arrayList.addAll(finish);
        arrayList.addAll(finish2);
        assertLatestScheduledVerticesAreEqualTo(arrayList);
    }

    @Test
    public void testRestartCreatedPipelinedTasks() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Set set = (Set) finish.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet());
        set.addAll((Collection) finish2.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toSet()));
        startScheduling.restartTasks(set);
        assertLatestScheduledVerticesAreEqualTo(finish);
    }

    @Test
    public void testPipelinedPartitionConsumable() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex = finish.get(0);
        TestingSchedulingResultPartition next = testingSchedulingExecutionVertex.getProducedResults().iterator().next();
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex.m447getId(), ExecutionState.RUNNING);
        startScheduling.onPartitionConsumable(next.m449getId());
        assertLatestScheduledVerticesAreEqualTo(finish2);
    }

    @Test
    public void testBlockingPointwiseExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).withInputDependencyConstraint(InputDependencyConstraint.ALL).finish();
        testingSchedulingTopology.connectPointwise(finish, finish2).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Iterator<TestingSchedulingExecutionVertex> it = finish.iterator();
        while (it.hasNext()) {
            startScheduling.onExecutionStateChange(it.next().m447getId(), ExecutionState.FINISHED);
        }
        assertLatestScheduledVerticesAreEqualTo(finish2);
    }

    @Test
    public void testBlockingALLExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish3 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).withInputDependencyConstraint(InputDependencyConstraint.ALL).finish();
        testingSchedulingTopology.connectPointwise(finish, finish3).finish();
        testingSchedulingTopology.connectPointwise(finish2, finish3).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Iterator<TestingSchedulingExecutionVertex> it = finish.iterator();
        while (it.hasNext()) {
            startScheduling.onExecutionStateChange(it.next().m447getId(), ExecutionState.FINISHED);
        }
        Iterator<TestingSchedulingExecutionVertex> it2 = finish2.iterator();
        while (it2.hasNext()) {
            startScheduling.onExecutionStateChange(it2.next().m447getId(), ExecutionState.FINISHED);
        }
        assertLatestScheduledVerticesAreEqualTo(finish3);
    }

    @Test
    public void testBlockingANYExecutionStateChange() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish3 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectPointwise(finish, finish3).finish();
        testingSchedulingTopology.connectPointwise(finish2, finish3).finish();
        LazyFromSourcesSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        Iterator<TestingSchedulingExecutionVertex> it = finish.iterator();
        while (it.hasNext()) {
            startScheduling.onExecutionStateChange(it.next().m447getId(), ExecutionState.FINISHED);
        }
        assertLatestScheduledVerticesAreEqualTo(finish3);
    }

    @Test
    public void testOnlyCreatedVertexWillBeScheduled() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        testingSchedulingTopology.newExecutionVertex();
        final TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex();
        this.testingSchedulerOperation = new TestingSchedulerOperations() { // from class: org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategyTest.1
            @Override // org.apache.flink.runtime.scheduler.strategy.TestingSchedulerOperations
            public void allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption> list) {
                super.allocateSlotsAndDeploy(list);
                newExecutionVertex.setState(ExecutionState.CANCELED);
            }
        };
        startScheduling(testingSchedulingTopology);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(1));
    }

    private LazyFromSourcesSchedulingStrategy startScheduling(TestingSchedulingTopology testingSchedulingTopology) {
        LazyFromSourcesSchedulingStrategy lazyFromSourcesSchedulingStrategy = new LazyFromSourcesSchedulingStrategy(this.testingSchedulerOperation, testingSchedulingTopology);
        lazyFromSourcesSchedulingStrategy.startScheduling();
        return lazyFromSourcesSchedulingStrategy;
    }

    private void assertLatestScheduledVerticesAreEqualTo(List<TestingSchedulingExecutionVertex> list) {
        List<List<ExecutionVertexDeploymentOption>> scheduledVertices = this.testingSchedulerOperation.getScheduledVertices();
        Assert.assertThat(Integer.valueOf(list.size()), Matchers.lessThanOrEqualTo(Integer.valueOf(scheduledVertices.size())));
        for (int i = 0; i < list.size(); i++) {
            Assert.assertEquals(idsFromVertices(Collections.singletonList(list.get((list.size() - i) - 1))), idsFromDeploymentOptions(scheduledVertices.get((scheduledVertices.size() - i) - 1)));
        }
    }

    private static List<ExecutionVertexID> idsFromVertices(List<TestingSchedulingExecutionVertex> list) {
        return (List) list.stream().map((v0) -> {
            return v0.m447getId();
        }).collect(Collectors.toList());
    }

    private static List<ExecutionVertexID> idsFromDeploymentOptions(List<ExecutionVertexDeploymentOption> list) {
        return (List) list.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList());
    }
}
