/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

public class StreamExecSort
extends ExecNodeBase<RowData>
implements StreamExecNode<RowData> {
    @Experimental
    public static final ConfigOption<Boolean> TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED = ConfigOptions.key((String)"table.exec.non-temporal-sort.enabled").booleanType().defaultValue((Object)false).withDescription("Set whether to enable universal sort for stream. When it is false, universal sort can't use for stream, default false. Just for testing.");
    private final SortSpec sortSpec;

    public StreamExecSort(SortSpec sortSpec, InputProperty inputProperty, RowType outputType, String description) {
        super(Collections.singletonList(inputProperty), outputType, description);
        this.sortSpec = sortSpec;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        TableConfig config = planner.getTableConfig();
        if (!config.getConfiguration().getBoolean(TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED)) {
            throw new TableException("Sort on a non-time-attribute field is not supported.");
        }
        ExecEdge inputEdge = this.getInputEdges().get(0);
        RowType inputType = (RowType)inputEdge.getOutputType();
        GeneratedRecordComparator rowComparator = ComparatorCodeGenerator.gen(config, "StreamExecSortComparator", inputType, this.sortSpec);
        StreamSortOperator sortOperator = new StreamSortOperator(InternalTypeInfo.of(inputType), rowComparator);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        return new OneInputTransformation(inputTransform, this.getDescription(), (OneInputStreamOperator)sortOperator, InternalTypeInfo.of(inputType), inputTransform.getParallelism());
    }
}

