/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAggregateBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.MiniBatchIncrementalGroupAggFunction;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name="stream-exec-incremental-group-aggregate", version=1, consumedOptions={"table.exec.mini-batch.enabled", "table.exec.mini-batch.size"}, producedTransformations={"incremental-group-aggregate"}, minPlanVersion=FlinkVersion.v1_15, minStateVersion=FlinkVersion.v1_15)
public class StreamExecIncrementalGroupAggregate
extends StreamExecAggregateBase {
    public static final String INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION = "incremental-group-aggregate";
    public static final String FIELD_NAME_PARTIAL_AGG_GROUPING = "partialAggGrouping";
    public static final String FIELD_NAME_FINAL_AGG_GROUPING = "finalAggGrouping";
    public static final String FIELD_NAME_PARTIAL_ORIGINAL_AGG_CALLS = "partialOriginalAggCalls";
    public static final String FIELD_NAME_PARTIAL_AGG_CALL_NEED_RETRACTIONS = "partialAggCallNeedRetractions";
    public static final String FIELD_NAME_PARTIAL_LOCAL_AGG_INPUT_TYPE = "partialLocalAggInputRowType";
    public static final String FIELD_NAME_PARTIAL_AGG_NEED_RETRACTION = "partialAggNeedRetraction";
    @JsonProperty(value="partialAggGrouping")
    private final int[] partialAggGrouping;
    @JsonProperty(value="finalAggGrouping")
    private final int[] finalAggGrouping;
    @JsonProperty(value="partialOriginalAggCalls")
    private final AggregateCall[] partialOriginalAggCalls;
    @JsonProperty(value="partialAggCallNeedRetractions")
    private final boolean[] partialAggCallNeedRetractions;
    @JsonProperty(value="partialLocalAggInputRowType")
    private final RowType partialLocalAggInputType;
    @JsonProperty(value="partialAggNeedRetraction")
    private final boolean partialAggNeedRetraction;

    public StreamExecIncrementalGroupAggregate(ReadableConfig tableConfig, int[] partialAggGrouping, int[] finalAggGrouping, AggregateCall[] partialOriginalAggCalls, boolean[] partialAggCallNeedRetractions, RowType partialLocalAggInputType, boolean partialAggNeedRetraction, InputProperty inputProperty, RowType outputType, String description) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecIncrementalGroupAggregate.class), ExecNodeContext.newPersistedConfig(StreamExecIncrementalGroupAggregate.class, tableConfig), partialAggGrouping, finalAggGrouping, partialOriginalAggCalls, partialAggCallNeedRetractions, partialLocalAggInputType, partialAggNeedRetraction, Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecIncrementalGroupAggregate(@JsonProperty(value="id") int id, @JsonProperty(value="type") ExecNodeContext context, @JsonProperty(value="configuration") ReadableConfig persistedConfig, @JsonProperty(value="partialAggGrouping") int[] partialAggGrouping, @JsonProperty(value="finalAggGrouping") int[] finalAggGrouping, @JsonProperty(value="partialOriginalAggCalls") AggregateCall[] partialOriginalAggCalls, @JsonProperty(value="partialAggCallNeedRetractions") boolean[] partialAggCallNeedRetractions, @JsonProperty(value="partialLocalAggInputRowType") RowType partialLocalAggInputType, @JsonProperty(value="partialAggNeedRetraction") boolean partialAggNeedRetraction, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, context, persistedConfig, inputProperties, (LogicalType)outputType, description);
        this.partialAggGrouping = (int[])Preconditions.checkNotNull((Object)partialAggGrouping);
        this.finalAggGrouping = (int[])Preconditions.checkNotNull((Object)finalAggGrouping);
        this.partialOriginalAggCalls = (AggregateCall[])Preconditions.checkNotNull((Object)partialOriginalAggCalls);
        this.partialAggCallNeedRetractions = (boolean[])Preconditions.checkNotNull((Object)partialAggCallNeedRetractions);
        Preconditions.checkArgument((partialOriginalAggCalls.length == partialAggCallNeedRetractions.length ? 1 : 0) != 0);
        this.partialLocalAggInputType = (RowType)Preconditions.checkNotNull((Object)partialLocalAggInputType);
        this.partialAggNeedRetraction = partialAggNeedRetraction;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        AggregateInfoList partialLocalAggInfoList = AggregateUtil.createPartialAggInfoList(planner.getTypeFactory(), this.partialLocalAggInputType, JavaScalaConversionUtil.toScala(Arrays.asList(this.partialOriginalAggCalls)), this.partialAggCallNeedRetractions, this.partialAggNeedRetraction, false);
        GeneratedAggsHandleFunction partialAggsHandler = this.generateAggsHandler("PartialGroupAggsHandler", partialLocalAggInfoList, this.partialAggGrouping.length, partialLocalAggInfoList.getAccTypes(), config, planner.getFlinkContext().getClassLoader(), planner.createRelBuilder(), true);
        AggregateInfoList incrementalAggInfo = AggregateUtil.createIncrementalAggInfoList(planner.getTypeFactory(), this.partialLocalAggInputType, JavaScalaConversionUtil.toScala(Arrays.asList(this.partialOriginalAggCalls)), this.partialAggCallNeedRetractions, this.partialAggNeedRetraction);
        GeneratedAggsHandleFunction finalAggsHandler = this.generateAggsHandler("FinalGroupAggsHandler", incrementalAggInfo, 0, partialLocalAggInfoList.getAccTypes(), config, planner.getFlinkContext().getClassLoader(), planner.createRelBuilder(), false);
        RowDataKeySelector partialKeySelector = KeySelectorUtil.getRowDataSelector(planner.getFlinkContext().getClassLoader(), this.partialAggGrouping, (InternalTypeInfo<RowData>)InternalTypeInfo.of((LogicalType)inputEdge.getOutputType()));
        RowDataKeySelector finalKeySelector = KeySelectorUtil.getRowDataSelector(planner.getFlinkContext().getClassLoader(), this.finalAggGrouping, (InternalTypeInfo<RowData>)partialKeySelector.getProducedType());
        MiniBatchIncrementalGroupAggFunction aggFunction = new MiniBatchIncrementalGroupAggFunction(partialAggsHandler, finalAggsHandler, (KeySelector)finalKeySelector, config.getStateRetentionTime());
        KeyedMapBundleOperator operator = new KeyedMapBundleOperator((MapBundleFunction)aggFunction, AggregateUtil.createMiniBatchTrigger(config));
        OneInputTransformation transform = ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION, config), operator, InternalTypeInfo.of((LogicalType)this.getOutputType()), inputTransform.getParallelism());
        transform.setStateKeySelector((KeySelector)partialKeySelector);
        transform.setStateKeyType((TypeInformation)partialKeySelector.getProducedType());
        return transform;
    }

    private GeneratedAggsHandleFunction generateAggsHandler(String name, AggregateInfoList aggInfoList, int mergedAccOffset, DataType[] mergedAccExternalTypes, ExecNodeConfig config, ClassLoader classLoader, RelBuilder relBuilder, boolean inputFieldCopy) {
        AggsHandlerCodeGenerator generator = new AggsHandlerCodeGenerator(new CodeGeneratorContext(config, classLoader), relBuilder, JavaScalaConversionUtil.toScala(this.partialLocalAggInputType.getChildren()), inputFieldCopy);
        return generator.needMerge(mergedAccOffset, true, mergedAccExternalTypes).generateAggsHandler(name, aggInfoList);
    }
}

