/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.processor;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMultipleInput;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCalc;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
import org.apache.flink.table.planner.plan.nodes.exec.batch.InputSortedExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.processor.ExecNodeGraphProcessor;
import org.apache.flink.table.planner.plan.nodes.exec.processor.ProcessorContext;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

public class ForwardHashExchangeProcessor
implements ExecNodeGraphProcessor {
    @Override
    public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
        if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
            throw new TableException("StreamExecNode is not supported yet");
        }
        JobManagerOptions.SchedulerType schedulerType = context.getPlanner().getExecEnv().getConfig().getSchedulerType().orElse(JobManagerOptions.SchedulerType.AdaptiveBatch);
        if (schedulerType != JobManagerOptions.SchedulerType.AdaptiveBatch) {
            return execGraph;
        }
        TableConfig tableConfig = context.getPlanner().getTableConfig();
        AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor((ReadableConfig)tableConfig){
            final /* synthetic */ ReadableConfig val$tableConfig;
            {
                this.val$tableConfig = readableConfig;
            }

            @Override
            protected void visitNode(ExecNode<?> node) {
                this.visitInputs(node);
                if (node instanceof CommonExecExchange) {
                    return;
                }
                boolean changed = false;
                ArrayList<ExecEdge> newEdges = new ArrayList<ExecEdge>(node.getInputEdges());
                for (int i = 0; i < node.getInputProperties().size(); ++i) {
                    InputProperty inputProperty = node.getInputProperties().get(i);
                    InputProperty.RequiredDistribution requiredDistribution = inputProperty.getRequiredDistribution();
                    ExecEdge edge = node.getInputEdges().get(i);
                    if (requiredDistribution.getType() != InputProperty.DistributionType.HASH) {
                        boolean visitChild;
                        boolean bl = visitChild = requiredDistribution.getType() == InputProperty.DistributionType.SINGLETON;
                        if (ForwardHashExchangeProcessor.this.hasExchangeInput(edge) || !ForwardHashExchangeProcessor.this.hasSortInputForInputSortedNode(node)) continue;
                        ExecEdge newEdge = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(this.val$tableConfig, edge, inputProperty, true, visitChild);
                        newEdges.set(i, newEdge);
                        changed = true;
                        continue;
                    }
                    if (!ForwardHashExchangeProcessor.this.hasExchangeInput(edge)) {
                        ExecEdge newEdge;
                        if (ForwardHashExchangeProcessor.this.isInputSortedNode(node)) {
                            if (ForwardHashExchangeProcessor.this.hasSortInputForInputSortedNode(node)) {
                                ExecNode<?> sort = edge.getSource();
                                ExecEdge newEdgeOfSort = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(this.val$tableConfig, sort.getInputEdges().get(0), inputProperty, false, true);
                                sort.setInputEdges(Collections.singletonList(newEdgeOfSort));
                            }
                            newEdge = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(this.val$tableConfig, edge, inputProperty, true, true);
                        } else {
                            newEdge = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(this.val$tableConfig, edge, inputProperty, false, true);
                            ForwardHashExchangeProcessor.this.updateOriginalEdgeInMultipleInput(node, i, (BatchExecExchange)newEdge.getSource());
                        }
                        newEdges.set(i, newEdge);
                        changed = true;
                        continue;
                    }
                    if (!ForwardHashExchangeProcessor.this.hasSortInputForInputSortedNode(node)) continue;
                    ExecEdge newEdge = ForwardHashExchangeProcessor.this.addExchangeAndReconnectEdge(this.val$tableConfig, edge, inputProperty, true, true);
                    newEdges.set(i, newEdge);
                    changed = true;
                }
                if (changed) {
                    node.setInputEdges(newEdges);
                }
            }
        };
        execGraph.getRootNodes().forEach(s2 -> s2.accept(visitor));
        return execGraph;
    }

    private ExecEdge addExchangeAndReconnectEdge(ReadableConfig tableConfig, ExecEdge edge, InputProperty inputProperty, boolean strict, boolean visitChild) {
        ExecNode<?> target = edge.getTarget();
        ExecNode<?> source = edge.getSource();
        if (source instanceof CommonExecExchange) {
            return edge;
        }
        if (visitChild && (source instanceof BatchExecCalc || source instanceof BatchExecPythonCalc || source instanceof BatchExecSort || source instanceof BatchExecCorrelate || source instanceof BatchExecPythonCorrelate)) {
            ExecEdge newEdge = this.addExchangeAndReconnectEdge(tableConfig, source.getInputEdges().get(0), inputProperty, strict, true);
            source.setInputEdges(Collections.singletonList(newEdge));
        }
        BatchExecExchange exchange = this.createExchangeWithKeepInputAsIsDistribution(tableConfig, inputProperty, strict, (RowType)edge.getOutputType());
        ExecEdge newEdge = new ExecEdge(source, exchange, edge.getShuffle(), edge.getExchangeMode());
        exchange.setInputEdges(Collections.singletonList(newEdge));
        return new ExecEdge(exchange, target, edge.getShuffle(), edge.getExchangeMode());
    }

    private BatchExecExchange createExchangeWithKeepInputAsIsDistribution(ReadableConfig tableConfig, InputProperty inputProperty, boolean strict, RowType outputRowType) {
        InputProperty newInputProperty = InputProperty.builder().requiredDistribution(InputProperty.keepInputAsIsDistribution(inputProperty.getRequiredDistribution(), strict)).damBehavior(inputProperty.getDamBehavior()).priority(inputProperty.getPriority()).build();
        return new BatchExecExchange(tableConfig, newInputProperty, outputRowType, newInputProperty.toString());
    }

    private boolean hasExchangeInput(ExecEdge edge) {
        ExecNode<?> input = edge.getSource();
        if (this.hasSortInputForInputSortedNode(edge.getTarget())) {
            input = input.getInputEdges().get(0).getSource();
        }
        return input instanceof CommonExecExchange;
    }

    private boolean hasSortInputForInputSortedNode(ExecNode<?> node) {
        return this.isInputSortedNode(node) && node.getInputEdges().get(0).getSource() instanceof BatchExecSort;
    }

    private boolean isInputSortedNode(ExecNode<?> node) {
        return node instanceof InputSortedExecNode;
    }

    private void updateOriginalEdgeInMultipleInput(ExecNode<?> node, int edgeIdx, BatchExecExchange newExchange) {
        if (node instanceof BatchExecMultipleInput) {
            this.updateOriginalEdgeInMultipleInput((BatchExecMultipleInput)node, edgeIdx, newExchange);
        }
    }

    private void updateOriginalEdgeInMultipleInput(BatchExecMultipleInput multipleInput, int edgeIdx, BatchExecExchange newExchange) {
        ExecEdge originalEdge = multipleInput.getOriginalEdges().get(edgeIdx);
        ExecNode<?> inputNode = originalEdge.getSource();
        ExecNode<?> targetNode = originalEdge.getTarget();
        int edgeIdxInTargetNode = targetNode.getInputEdges().indexOf(originalEdge);
        Preconditions.checkArgument((edgeIdxInTargetNode >= 0 ? 1 : 0) != 0);
        ArrayList<ExecEdge> newEdges = new ArrayList<ExecEdge>(targetNode.getInputEdges());
        ExecEdge newEdge1 = new ExecEdge(inputNode, newExchange, originalEdge.getShuffle(), originalEdge.getExchangeMode());
        newExchange.setInputEdges(Collections.singletonList(newEdge1));
        ExecEdge newEdge2 = new ExecEdge(newExchange, targetNode, originalEdge.getShuffle(), originalEdge.getExchangeMode());
        newEdges.set(edgeIdxInTargetNode, newEdge2);
        targetNode.setInputEdges(newEdges);
    }
}

