/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegionConsumedBlockingPartitions;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegionExecutionView;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class RegionPartitionReleaseStrategy
implements PartitionReleaseStrategy {
    private final SchedulingTopology<?, ?> schedulingTopology;
    private final Map<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = new IdentityHashMap<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions>();
    private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<ExecutionVertexID, PipelinedRegionExecutionView>();

    public RegionPartitionReleaseStrategy(SchedulingTopology<?, ?> schedulingTopology, Set<PipelinedRegion> pipelinedRegions) {
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull(schedulingTopology);
        Preconditions.checkNotNull(pipelinedRegions);
        this.initConsumedBlockingPartitionsByRegion(pipelinedRegions);
        this.initRegionExecutionViewByVertex(pipelinedRegions);
    }

    private void initConsumedBlockingPartitionsByRegion(Set<PipelinedRegion> pipelinedRegions) {
        for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
            PipelinedRegionConsumedBlockingPartitions consumedPartitions = this.computeConsumedPartitionsOfVertexRegion(pipelinedRegion);
            this.consumedBlockingPartitionsByRegion.put(pipelinedRegion, consumedPartitions);
        }
    }

    private void initRegionExecutionViewByVertex(Set<PipelinedRegion> pipelinedRegions) {
        for (PipelinedRegion pipelinedRegion : pipelinedRegions) {
            PipelinedRegionExecutionView regionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
            for (ExecutionVertexID executionVertexId : pipelinedRegion) {
                this.regionExecutionViewByVertex.put(executionVertexId, regionExecutionView);
            }
        }
    }

    private PipelinedRegionConsumedBlockingPartitions computeConsumedPartitionsOfVertexRegion(PipelinedRegion pipelinedRegion) {
        Set<IntermediateResultPartitionID> resultPartitionsOutsideOfRegion = this.findResultPartitionsOutsideOfRegion(pipelinedRegion);
        return new PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, resultPartitionsOutsideOfRegion);
    }

    private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(PipelinedRegion pipelinedRegion) {
        Set<SchedulingResultPartition<?, ?>> allConsumedPartitionsInRegion = pipelinedRegion.getExecutionVertexIds().stream().map(this.schedulingTopology::getVertexOrThrow).flatMap(vertex -> IterableUtils.toStream(vertex.getConsumedResults())).collect(Collectors.toSet());
        return RegionPartitionReleaseStrategy.filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, pipelinedRegion);
    }

    private static Set<IntermediateResultPartitionID> filterResultPartitionsOutsideOfRegion(Collection<SchedulingResultPartition<?, ?>> resultPartitions, PipelinedRegion pipelinedRegion) {
        HashSet<IntermediateResultPartitionID> result = new HashSet<IntermediateResultPartitionID>();
        for (SchedulingResultPartition<?, ?> maybeOutsidePartition : resultPartitions) {
            SchedulingExecutionVertex producer = (SchedulingExecutionVertex)maybeOutsidePartition.getProducer();
            if (pipelinedRegion.contains((ExecutionVertexID)producer.getId())) continue;
            result.add((IntermediateResultPartitionID)maybeOutsidePartition.getId());
        }
        return result;
    }

    @Override
    public List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID finishedVertex) {
        PipelinedRegionExecutionView regionExecutionView = this.getPipelinedRegionExecutionViewForVertex(finishedVertex);
        regionExecutionView.vertexFinished(finishedVertex);
        if (regionExecutionView.isFinished()) {
            PipelinedRegion pipelinedRegion = this.getPipelinedRegionForVertex(finishedVertex);
            PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion = this.getConsumedBlockingPartitionsForRegion(pipelinedRegion);
            return this.filterReleasablePartitions(consumedPartitionsOfVertexRegion);
        }
        return Collections.emptyList();
    }

    @Override
    public void vertexUnfinished(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView regionExecutionView = this.getPipelinedRegionExecutionViewForVertex(executionVertexId);
        regionExecutionView.vertexUnfinished(executionVertexId);
    }

    private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView pipelinedRegionExecutionView = this.regionExecutionViewByVertex.get(executionVertexId);
        Preconditions.checkState((pipelinedRegionExecutionView != null ? 1 : 0) != 0, (String)"PipelinedRegionExecutionView not found for execution vertex %s", (Object[])new Object[]{executionVertexId});
        return pipelinedRegionExecutionView;
    }

    private PipelinedRegion getPipelinedRegionForVertex(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView pipelinedRegionExecutionView = this.getPipelinedRegionExecutionViewForVertex(executionVertexId);
        return pipelinedRegionExecutionView.getPipelinedRegion();
    }

    private PipelinedRegionConsumedBlockingPartitions getConsumedBlockingPartitionsForRegion(PipelinedRegion pipelinedRegion) {
        PipelinedRegionConsumedBlockingPartitions pipelinedRegionConsumedBlockingPartitions = this.consumedBlockingPartitionsByRegion.get(pipelinedRegion);
        Preconditions.checkState((pipelinedRegionConsumedBlockingPartitions != null ? 1 : 0) != 0, (String)"Consumed partitions not found for pipelined region %s", (Object[])new Object[]{pipelinedRegion});
        Preconditions.checkState((pipelinedRegionConsumedBlockingPartitions.getPipelinedRegion() == pipelinedRegion ? 1 : 0) != 0);
        return pipelinedRegionConsumedBlockingPartitions;
    }

    private List<IntermediateResultPartitionID> filterReleasablePartitions(PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion) {
        return consumedPartitionsOfVertexRegion.getConsumedBlockingPartitions().stream().filter(this::areConsumerRegionsFinished).collect(Collectors.toList());
    }

    private boolean areConsumerRegionsFinished(IntermediateResultPartitionID resultPartitionId) {
        Object resultPartition = this.schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
        return IterableUtils.toStream(resultPartition.getConsumers()).map(Vertex::getId).allMatch(this::isRegionOfVertexFinished);
    }

    private boolean isRegionOfVertexFinished(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView regionExecutionView = this.getPipelinedRegionExecutionViewForVertex(executionVertexId);
        return regionExecutionView.isFinished();
    }

    public static class Factory
    implements PartitionReleaseStrategy.Factory {
        @Override
        public PartitionReleaseStrategy createInstance(SchedulingTopology<?, ?> schedulingStrategy) {
            Set distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(schedulingStrategy);
            return new RegionPartitionReleaseStrategy(schedulingStrategy, PipelinedRegionComputeUtil.toPipelinedRegionsSet(distinctRegions));
        }
    }
}

