/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.lang.reflect.Constructor;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@JsonIgnoreProperties(ignoreUnknown=true)
public abstract class CommonExecPythonCorrelate
extends ExecNodeBase<RowData>
implements SingleTransformationTranslator<RowData> {
    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
    public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
    private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator";
    @JsonProperty(value="joinType")
    private final FlinkJoinType joinType;
    @JsonProperty(value="functionCall")
    private final RexCall invocation;

    public CommonExecPythonCorrelate(FlinkJoinType joinType, RexCall invocation, int id, List<InputProperty> inputProperties, RowType outputType, String description) {
        super(id, inputProperties, outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.joinType = joinType;
        this.invocation = invocation;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        Configuration config = CommonPythonUtil.getMergedConfig(planner.getExecEnv(), planner.getTableConfig());
        OneInputTransformation<RowData, RowData> transform = this.createPythonOneInputTransformation(inputTransform, config);
        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) {
            transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        }
        return transform;
    }

    private OneInputTransformation<RowData, RowData> createPythonOneInputTransformation(Transformation<RowData> inputTransform, Configuration config) {
        Tuple2<int[], PythonFunctionInfo> extractResult = this.extractPythonTableFunctionInfo();
        int[] pythonUdtfInputOffsets = (int[])extractResult.f0;
        PythonFunctionInfo pythonFunctionInfo = (PythonFunctionInfo)extractResult.f1;
        InternalTypeInfo pythonOperatorInputRowType = (InternalTypeInfo)inputTransform.getOutputType();
        InternalTypeInfo<RowData> pythonOperatorOutputRowType = InternalTypeInfo.of((RowType)this.getOutputType());
        OneInputStreamOperator<RowData, RowData> pythonOperator = this.getPythonTableFunctionOperator(config, pythonOperatorInputRowType, pythonOperatorOutputRowType, pythonFunctionInfo, pythonUdtfInputOffsets);
        return new OneInputTransformation(inputTransform, this.getDescription(), pythonOperator, pythonOperatorOutputRowType, inputTransform.getParallelism());
    }

    private Tuple2<int[], PythonFunctionInfo> extractPythonTableFunctionInfo() {
        LinkedHashMap<RexNode, Integer> inputNodes = new LinkedHashMap<RexNode, Integer>();
        PythonFunctionInfo pythonTableFunctionInfo = CommonPythonUtil.createPythonFunctionInfo(this.invocation, inputNodes);
        int[] udtfInputOffsets = inputNodes.keySet().stream().filter(x -> x instanceof RexInputRef).map(x -> ((RexInputRef)x).getIndex()).mapToInt(i -> i).toArray();
        return Tuple2.of((Object)udtfInputOffsets, (Object)pythonTableFunctionInfo);
    }

    private OneInputStreamOperator<RowData, RowData> getPythonTableFunctionOperator(Configuration config, InternalTypeInfo<RowData> inputRowType, InternalTypeInfo<RowData> outputRowType, PythonFunctionInfo pythonFunctionInfo, int[] udtfInputOffsets) {
        Class clazz = CommonPythonUtil.loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME);
        try {
            Constructor ctor = clazz.getConstructor(Configuration.class, PythonFunctionInfo.class, RowType.class, RowType.class, int[].class, FlinkJoinType.class);
            return (OneInputStreamOperator)ctor.newInstance(new Object[]{config, pythonFunctionInfo, inputRowType.toRowType(), outputRowType.toRowType(), udtfInputOffsets, this.joinType});
        }
        catch (Exception e) {
            throw new TableException("Python Table Function Operator constructed failed.", e);
        }
    }
}

