/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.processors.utils;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
import org.apache.flink.util.Preconditions;

@Internal
class TopologyGraph {
    private final Map<ExecNode<?, ?>, TopologyNode> nodes = new HashMap();

    TopologyGraph(List<ExecNode<?, ?>> roots) {
        this(roots, Collections.emptySet());
    }

    TopologyGraph(List<ExecNode<?, ?>> roots, final Set<ExecNode<?, ?>> boundaries) {
        AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor(){

            @Override
            protected void visitNode(ExecNode<?, ?> node) {
                if (boundaries.contains(node)) {
                    return;
                }
                for (ExecNode<?, ?> input : node.getInputNodes()) {
                    TopologyGraph.this.link(input, node);
                }
                this.visitInputs(node);
            }
        };
        roots.forEach(n -> n.accept(visitor));
    }

    boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
        TopologyNode fromNode = this.getOrCreateTopologyNode(from);
        TopologyNode toNode = this.getOrCreateTopologyNode(to);
        if (this.canReach(toNode, fromNode)) {
            return false;
        }
        fromNode.outputs.add(toNode);
        toNode.inputs.add(fromNode);
        return true;
    }

    void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
        TopologyNode fromNode = this.getOrCreateTopologyNode(from);
        TopologyNode toNode = this.getOrCreateTopologyNode(to);
        fromNode.outputs.remove(toNode);
        toNode.inputs.remove(fromNode);
    }

    Map<ExecNode<?, ?>, Integer> calculateMaximumDistance() {
        HashMap result = new HashMap();
        HashMap<TopologyNode, Integer> inputsVisitedMap = new HashMap<TopologyNode, Integer>();
        LinkedList<TopologyNode> queue = new LinkedList<TopologyNode>();
        for (TopologyNode node : this.nodes.values()) {
            if (node.inputs.size() != 0) continue;
            queue.offer(node);
        }
        while (!queue.isEmpty()) {
            TopologyNode node = (TopologyNode)queue.poll();
            int dist = -1;
            for (TopologyNode input : node.inputs) {
                dist = Math.max(dist, (Integer)Preconditions.checkNotNull(result.get(input.execNode), (String)"The distance of an input node is not calculated. This is a bug."));
            }
            result.put(node.execNode, ++dist);
            for (TopologyNode output : node.outputs) {
                int inputsVisited = inputsVisitedMap.compute(output, (k, v) -> v == null ? 1 : v + 1);
                if (inputsVisited != output.inputs.size()) continue;
                queue.offer(output);
            }
        }
        return result;
    }

    void makeAsFarAs(ExecNode<?, ?> a, ExecNode<?, ?> b) {
        TopologyNode nodeA = this.getOrCreateTopologyNode(a);
        TopologyNode nodeB = this.getOrCreateTopologyNode(b);
        for (TopologyNode input : nodeB.inputs) {
            this.link(input.execNode, nodeA.execNode);
        }
    }

    @VisibleForTesting
    boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) {
        TopologyNode fromNode = this.getOrCreateTopologyNode(from);
        TopologyNode toNode = this.getOrCreateTopologyNode(to);
        return this.canReach(fromNode, toNode);
    }

    private boolean canReach(TopologyNode from, TopologyNode to) {
        HashSet<TopologyNode> visited = new HashSet<TopologyNode>();
        visited.add(from);
        LinkedList<TopologyNode> queue = new LinkedList<TopologyNode>();
        queue.offer(from);
        while (!queue.isEmpty()) {
            TopologyNode node = (TopologyNode)queue.poll();
            if (to.equals(node)) {
                return true;
            }
            for (TopologyNode next : node.outputs) {
                if (visited.contains(next)) continue;
                visited.add(next);
                queue.offer(next);
            }
        }
        return false;
    }

    private TopologyNode getOrCreateTopologyNode(ExecNode<?, ?> execNode) {
        if (execNode instanceof BatchExecBoundedStreamScan) {
            DataStream<Object> currentStream = ((BatchExecBoundedStreamScan)execNode).boundedStreamTable().dataStream();
            for (Map.Entry<ExecNode<?, ?>, TopologyNode> entry : this.nodes.entrySet()) {
                DataStream<Object> existingStream;
                ExecNode<?, ?> key = entry.getKey();
                if (!(key instanceof BatchExecBoundedStreamScan) || !(existingStream = ((BatchExecBoundedStreamScan)key).boundedStreamTable().dataStream()).equals(currentStream)) continue;
                return entry.getValue();
            }
            TopologyNode result = new TopologyNode(execNode);
            this.nodes.put(execNode, result);
            return result;
        }
        return this.nodes.computeIfAbsent(execNode, k -> new TopologyNode(execNode));
    }

    private static class TopologyNode {
        private final ExecNode<?, ?> execNode;
        private final Set<TopologyNode> inputs;
        private final Set<TopologyNode> outputs;

        private TopologyNode(ExecNode<?, ?> execNode) {
            this.execNode = execNode;
            this.inputs = new HashSet<TopologyNode>();
            this.outputs = new HashSet<TopologyNode>();
        }
    }
}

