/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
import org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import scala.Option;
import scala.Some;

public class StreamExecPythonCorrelateRule
extends ConverterRule {
    public static final RelOptRule INSTANCE = new StreamExecPythonCorrelateRule();

    private StreamExecPythonCorrelateRule() {
        super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(), "StreamExecPythonCorrelateRule");
    }

    private boolean findTableFunction(FlinkLogicalCalc calc) {
        RelNode child = ((RelSubset)calc.getInput()).getOriginal();
        if (child instanceof FlinkLogicalTableFunctionScan) {
            FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan)child;
            return PythonUtil.isPythonCall(scan.getCall(), null);
        }
        if (child instanceof FlinkLogicalCalc) {
            FlinkLogicalCalc childCalc = (FlinkLogicalCalc)child;
            return this.findTableFunction(childCalc);
        }
        return false;
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        FlinkLogicalCorrelate correlate = (FlinkLogicalCorrelate)call.rel(0);
        RelNode right = ((RelSubset)correlate.getRight()).getOriginal();
        if (right instanceof FlinkLogicalTableFunctionScan) {
            FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan)right;
            return PythonUtil.isPythonCall(scan.getCall(), null);
        }
        if (right instanceof FlinkLogicalCalc) {
            return this.findTableFunction((FlinkLogicalCalc)right);
        }
        return false;
    }

    @Override
    public RelNode convert(RelNode relNode) {
        StreamExecPythonCorrelateFactory factory = new StreamExecPythonCorrelateFactory(relNode);
        return factory.convertToCorrelate();
    }

    private static class StreamExecPythonCorrelateFactory {
        private final FlinkLogicalCorrelate correlate;
        private final RelTraitSet traitSet;
        private final RelNode convInput;
        private final RelNode right;

        StreamExecPythonCorrelateFactory(RelNode rel) {
            this.correlate = (FlinkLogicalCorrelate)rel;
            this.traitSet = rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
            this.convInput = RelOptRule.convert(this.correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL());
            this.right = this.correlate.getInput(1);
        }

        StreamExecPythonCorrelate convertToCorrelate() {
            return this.convertToCorrelate(this.right, (Option<RexNode>)Option.empty());
        }

        private StreamExecPythonCorrelate convertToCorrelate(RelNode relNode, Option<RexNode> condition) {
            if (relNode instanceof RelSubset) {
                RelSubset rel = (RelSubset)relNode;
                return this.convertToCorrelate(rel.getRelList().get(0), condition);
            }
            if (relNode instanceof FlinkLogicalCalc) {
                FlinkLogicalCalc calc = (FlinkLogicalCalc)relNode;
                FlinkLogicalTableFunctionScan tableScan = StreamExecCorrelateRule.getTableScan(calc);
                FlinkLogicalCalc newCalc = StreamExecCorrelateRule.getMergedCalc(calc);
                return this.convertToCorrelate(tableScan, (Option<RexNode>)Some.apply((Object)newCalc.getProgram().expandLocalRef(newCalc.getProgram().getCondition())));
            }
            FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan)relNode;
            return new StreamExecPythonCorrelate(this.correlate.getCluster(), this.traitSet, this.convInput, null, scan, condition, this.correlate.getRowType(), this.correlate.getJoinType());
        }
    }
}

