package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.class */
public class LocalInputPreferredSlotSharingStrategyTest extends TestLogger {
    private TestingSchedulingTopology topology;
    private static final JobVertexID JOB_VERTEX_ID_1 = new JobVertexID();
    private static final JobVertexID JOB_VERTEX_ID_2 = new JobVertexID();
    private TestingSchedulingExecutionVertex ev11;
    private TestingSchedulingExecutionVertex ev12;
    private TestingSchedulingExecutionVertex ev21;
    private TestingSchedulingExecutionVertex ev22;
    private Set<SlotSharingGroup> slotSharingGroups;

    @Before
    public void setUp() throws Exception {
        this.topology = new TestingSchedulingTopology();
        this.ev11 = this.topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
        this.ev12 = this.topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
        this.ev21 = this.topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
        this.ev22 = this.topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1, ResourceSpec.UNKNOWN);
        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2, ResourceSpec.UNKNOWN);
        this.slotSharingGroups = Collections.singleton(slotSharingGroup);
    }

    @Test
    public void testCoLocationConstraintIsRespected() {
        this.topology.connect(this.ev11, this.ev22);
        this.topology.connect(this.ev12, this.ev21);
        LocalInputPreferredSlotSharingStrategy localInputPreferredSlotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(this.topology, this.slotSharingGroups, Collections.singleton(CoLocationGroupDesc.from(new JobVertexID[]{JOB_VERTEX_ID_1, JOB_VERTEX_ID_2})));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroups(), Matchers.hasSize(2));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev11.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev11.m447getId(), this.ev21.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev12.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev12.m447getId(), this.ev22.m447getId()}));
    }

    @Test
    public void testInputLocalityIsRespected() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex newExecutionVertex = testingSchedulingTopology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
        TestingSchedulingExecutionVertex newExecutionVertex2 = testingSchedulingTopology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
        TestingSchedulingExecutionVertex newExecutionVertex3 = testingSchedulingTopology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
        TestingSchedulingExecutionVertex newExecutionVertex4 = testingSchedulingTopology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
        TestingSchedulingExecutionVertex newExecutionVertex5 = testingSchedulingTopology.newExecutionVertex(JOB_VERTEX_ID_2, 2);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex3);
        testingSchedulingTopology.connect(newExecutionVertex, newExecutionVertex4);
        testingSchedulingTopology.connect(newExecutionVertex2, newExecutionVertex5);
        LocalInputPreferredSlotSharingStrategy localInputPreferredSlotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(testingSchedulingTopology, this.slotSharingGroups, Collections.emptySet());
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroups(), Matchers.hasSize(3));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex3.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newExecutionVertex.m447getId(), newExecutionVertex3.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex4.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newExecutionVertex4.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(newExecutionVertex5.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{newExecutionVertex2.m447getId(), newExecutionVertex5.m447getId()}));
    }

    @Test
    public void testDisjointVerticesInOneGroup() {
        LocalInputPreferredSlotSharingStrategy localInputPreferredSlotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(this.topology, this.slotSharingGroups, Collections.emptySet());
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroups(), Matchers.hasSize(2));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev11.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev11.m447getId(), this.ev21.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev12.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev12.m447getId(), this.ev22.m447getId()}));
    }

    @Test
    public void testVerticesInDifferentSlotSharingGroups() {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1, ResourceSpec.UNKNOWN);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2, ResourceSpec.UNKNOWN);
        HashSet hashSet = new HashSet();
        hashSet.add(slotSharingGroup);
        hashSet.add(slotSharingGroup2);
        LocalInputPreferredSlotSharingStrategy localInputPreferredSlotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(this.topology, hashSet, Collections.emptySet());
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroups(), Matchers.hasSize(4));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev11.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev11.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev12.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev12.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev21.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev21.m447getId()}));
        Assert.assertThat(localInputPreferredSlotSharingStrategy.getExecutionSlotSharingGroup(this.ev22.m447getId()).getExecutionVertexIds(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{this.ev22.m447getId()}));
    }
}
