/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.HashSet;
import java.util.Iterator;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class RestartPipelinedRegionFailoverStrategyTest
extends TestLogger {
    @Test
    public void testRegionFailoverForRegionInternalErrors() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
        topology.connect(v1, v4, ResultPartitionType.BLOCKING);
        topology.connect(v1, v5, ResultPartitionType.BLOCKING);
        topology.connect(v2, v4, ResultPartitionType.BLOCKING);
        topology.connect(v2, v5, ResultPartitionType.BLOCKING);
        topology.connect(v3, v6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        HashSet<ExecutionVertexID> expectedResult = new HashSet<ExecutionVertexID>();
        expectedResult.add(v1.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v1.getId(), (Throwable)new Exception("Test failure")));
        expectedResult.clear();
        expectedResult.add(v2.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v2.getId(), (Throwable)new Exception("Test failure")));
        expectedResult.clear();
        expectedResult.add(v3.getId());
        expectedResult.add(v6.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new Exception("Test failure")));
        expectedResult.clear();
        expectedResult.add(v4.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v4.getId(), (Throwable)new Exception("Test failure")));
        expectedResult.clear();
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v5.getId(), (Throwable)new Exception("Test failure")));
        expectedResult.clear();
        expectedResult.add(v6.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v6.getId(), (Throwable)new Exception("Test failure")));
    }

    @Test
    public void testRegionFailoverForDataConsumptionErrors() throws Exception {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
        topology.connect(v1, v4, ResultPartitionType.BLOCKING);
        topology.connect(v1, v5, ResultPartitionType.BLOCKING);
        topology.connect(v2, v4, ResultPartitionType.BLOCKING);
        topology.connect(v2, v5, ResultPartitionType.BLOCKING);
        topology.connect(v3, v6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        HashSet<ExecutionVertexID> expectedResult = new HashSet<ExecutionVertexID>();
        Iterator<TestingSchedulingResultPartition> v4InputEdgeIterator = v4.getConsumedResults().iterator();
        expectedResult.add(v1.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v4.getId(), (Throwable)new PartitionConnectionException(new ResultPartitionID(v4InputEdgeIterator.next().getId(), new ExecutionAttemptID()), (Throwable)new Exception("Test failure"))));
        expectedResult.clear();
        expectedResult.add(v2.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v4.getId(), (Throwable)new PartitionNotFoundException(new ResultPartitionID(v4InputEdgeIterator.next().getId(), new ExecutionAttemptID()))));
        expectedResult.clear();
        Iterator<TestingSchedulingResultPartition> v5InputEdgeIterator = v5.getConsumedResults().iterator();
        expectedResult.add(v1.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v5.getId(), (Throwable)new PartitionConnectionException(new ResultPartitionID(v5InputEdgeIterator.next().getId(), new ExecutionAttemptID()), (Throwable)new Exception("Test failure"))));
        expectedResult.clear();
        expectedResult.add(v2.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v5.getId(), (Throwable)new PartitionNotFoundException(new ResultPartitionID(v5InputEdgeIterator.next().getId(), new ExecutionAttemptID()))));
        expectedResult.clear();
        Iterator<TestingSchedulingResultPartition> v6InputEdgeIterator = v6.getConsumedResults().iterator();
        expectedResult.add(v3.getId());
        expectedResult.add(v6.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v6.getId(), (Throwable)new PartitionConnectionException(new ResultPartitionID(v6InputEdgeIterator.next().getId(), new ExecutionAttemptID()), (Throwable)new Exception("Test failure"))));
    }

    @Test
    public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        topology.connect(v1, v3, ResultPartitionType.BLOCKING);
        topology.connect(v2, v3, ResultPartitionType.BLOCKING);
        TestResultPartitionAvailabilityChecker availabilityChecker = new TestResultPartitionAvailabilityChecker();
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology, (ResultPartitionAvailabilityChecker)availabilityChecker);
        IntermediateResultPartitionID rp1ID = v1.getProducedResults().iterator().next().getId();
        IntermediateResultPartitionID rp2ID = v2.getProducedResults().iterator().next().getId();
        availabilityChecker.failedPartitions.clear();
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v1.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v2.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v2.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v3.getId()}));
        availabilityChecker.failedPartitions.clear();
        availabilityChecker.markResultPartitionFailed(rp1ID);
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v1.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v2.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v2.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v3.getId()}));
        availabilityChecker.failedPartitions.clear();
        availabilityChecker.markResultPartitionFailed(rp2ID);
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v1.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v2.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v2.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v2.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v2.getId(), v3.getId()}));
        availabilityChecker.failedPartitions.clear();
        availabilityChecker.markResultPartitionFailed(rp1ID);
        availabilityChecker.markResultPartitionFailed(rp2ID);
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v1.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v2.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v2.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v2.getId(), v3.getId()}));
        MatcherAssert.assertThat((Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new Exception("Test failure")), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{v1.getId(), v2.getId(), v3.getId()}));
    }

    @Test
    public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex();
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex();
        topology.connect(v1, v2, ResultPartitionType.PIPELINED);
        topology.connect(v2, v3, ResultPartitionType.BLOCKING);
        topology.connect(v3, v4, ResultPartitionType.PIPELINED);
        topology.connect(v4, v5, ResultPartitionType.BLOCKING);
        topology.connect(v5, v6, ResultPartitionType.PIPELINED);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        HashSet<ExecutionVertexID> expectedResult = new HashSet<ExecutionVertexID>();
        expectedResult.add(v3.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        expectedResult.add(v6.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new Exception("Test failure")));
        expectedResult.clear();
        expectedResult.add(v1.getId());
        expectedResult.add(v2.getId());
        expectedResult.add(v3.getId());
        expectedResult.add(v4.getId());
        expectedResult.add(v5.getId());
        expectedResult.add(v6.getId());
        Assert.assertEquals(expectedResult, (Object)strategy.getTasksNeedingRestart(v3.getId(), (Throwable)new PartitionConnectionException(new ResultPartitionID(v3.getConsumedResults().iterator().next().getId(), new ExecutionAttemptID()), (Throwable)new Exception("Test failure"))));
    }

    private static class TestResultPartitionAvailabilityChecker
    implements ResultPartitionAvailabilityChecker {
        private final HashSet<IntermediateResultPartitionID> failedPartitions = new HashSet();

        public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
            return !this.failedPartitions.contains(resultPartitionID);
        }

        public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
            this.failedPartitions.add(resultPartitionID);
        }

        public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
            this.failedPartitions.remove(resultPartitionID);
        }
    }
}

