/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.RecordCounter;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;

public class MiniBatchGlobalGroupAggFunction
extends MapBundleFunction<BaseRow, BaseRow, BaseRow, BaseRow> {
    private static final long serialVersionUID = 8349579876002001744L;
    private final GeneratedAggsHandleFunction genLocalAggsHandler;
    private final GeneratedAggsHandleFunction genGlobalAggsHandler;
    private final GeneratedRecordEqualiser genRecordEqualiser;
    private final LogicalType[] accTypes;
    private final RecordCounter recordCounter;
    private final boolean generateRetraction;
    private transient JoinedRow resultRow = new JoinedRow();
    private transient AggsHandleFunction localAgg = null;
    private transient AggsHandleFunction globalAgg = null;
    private transient RecordEqualiser equaliser = null;
    private transient ValueState<BaseRow> accState = null;

    public MiniBatchGlobalGroupAggFunction(GeneratedAggsHandleFunction genLocalAggsHandler, GeneratedAggsHandleFunction genGlobalAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, LogicalType[] accTypes, int indexOfCountStar, boolean generateRetraction) {
        this.genLocalAggsHandler = genLocalAggsHandler;
        this.genGlobalAggsHandler = genGlobalAggsHandler;
        this.genRecordEqualiser = genRecordEqualiser;
        this.accTypes = accTypes;
        this.recordCounter = RecordCounter.of(indexOfCountStar);
        this.generateRetraction = generateRetraction;
    }

    @Override
    public void open(ExecutionContext ctx) throws Exception {
        super.open(ctx);
        this.localAgg = (AggsHandleFunction)this.genLocalAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.localAgg.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext()));
        this.globalAgg = (AggsHandleFunction)this.genGlobalAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.globalAgg.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext()));
        this.equaliser = (RecordEqualiser)this.genRecordEqualiser.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(this.accTypes);
        ValueStateDescriptor accDesc = new ValueStateDescriptor("accState", (TypeInformation)accTypeInfo);
        this.accState = ctx.getRuntimeContext().getState(accDesc);
        this.resultRow = new JoinedRow();
    }

    @Override
    public BaseRow addInput(@Nullable BaseRow previousAcc, BaseRow input) throws Exception {
        BaseRow currentAcc = previousAcc == null ? this.localAgg.createAccumulators() : previousAcc;
        this.localAgg.setAccumulators(currentAcc);
        this.localAgg.merge(input);
        return this.localAgg.getAccumulators();
    }

    @Override
    public void finishBundle(Map<BaseRow, BaseRow> buffer, Collector<BaseRow> out) throws Exception {
        for (Map.Entry<BaseRow, BaseRow> entry : buffer.entrySet()) {
            BaseRow currentKey = entry.getKey();
            BaseRow bufferAcc = entry.getValue();
            boolean firstRow = false;
            this.ctx.setCurrentKey(currentKey);
            BaseRow stateAcc = (BaseRow)this.accState.value();
            if (stateAcc == null) {
                stateAcc = this.globalAgg.createAccumulators();
                firstRow = true;
            }
            this.globalAgg.setAccumulators(stateAcc);
            BaseRow prevAggValue = this.globalAgg.getValue();
            this.globalAgg.merge(bufferAcc);
            BaseRow newAggValue = this.globalAgg.getValue();
            stateAcc = this.globalAgg.getAccumulators();
            if (!this.recordCounter.recordCountIsZero(stateAcc)) {
                this.accState.update((Object)stateAcc);
                if (!firstRow) {
                    if (this.equaliser.equalsWithoutHeader(prevAggValue, newAggValue)) continue;
                    if (this.generateRetraction) {
                        this.resultRow.replace(currentKey, prevAggValue).setHeader((byte)1);
                        out.collect((Object)this.resultRow);
                    }
                    this.resultRow.replace(currentKey, newAggValue).setHeader((byte)0);
                    out.collect((Object)this.resultRow);
                    continue;
                }
                this.resultRow.replace(currentKey, newAggValue).setHeader((byte)0);
                out.collect((Object)this.resultRow);
                continue;
            }
            if (!firstRow) {
                this.resultRow.replace(currentKey, prevAggValue).setHeader((byte)1);
                out.collect((Object)this.resultRow);
            }
            this.accState.clear();
            this.globalAgg.cleanup();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.localAgg != null) {
            this.localAgg.close();
        }
        if (this.globalAgg != null) {
            this.globalAgg.close();
        }
    }
}

