/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper;
import org.apache.flink.table.runtime.operators.deduplicate.MiniBatchDeduplicateFunctionBase;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;

public class RowTimeMiniBatchDeduplicateFunction
extends MiniBatchDeduplicateFunctionBase<RowData, RowData, List<RowData>, RowData, RowData> {
    private static final long serialVersionUID = 1L;
    private final TypeSerializer<RowData> serializer;
    private final boolean generateUpdateBefore;
    private final boolean generateInsert;
    private final int rowtimeIndex;
    private final boolean keepLastRow;

    public RowTimeMiniBatchDeduplicateFunction(InternalTypeInfo<RowData> typeInfo, TypeSerializer<RowData> serializer, long minRetentionTime, int rowtimeIndex, boolean generateUpdateBefore, boolean generateInsert, boolean keepLastRow) {
        super(typeInfo, minRetentionTime);
        this.serializer = serializer;
        this.generateUpdateBefore = generateUpdateBefore;
        this.generateInsert = generateInsert;
        this.rowtimeIndex = rowtimeIndex;
        this.keepLastRow = keepLastRow;
    }

    @Override
    public List<RowData> addInput(@Nullable List<RowData> value, RowData input) throws Exception {
        if (value == null) {
            value = new ArrayList<RowData>();
        }
        value.add((RowData)this.serializer.copy((Object)input));
        return value;
    }

    @Override
    public void finishBundle(Map<RowData, List<RowData>> buffer, Collector<RowData> out) throws Exception {
        for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
            RowData currentKey = entry.getKey();
            List<RowData> bufferedRows = entry.getValue();
            this.ctx.setCurrentKey(currentKey);
            RowTimeMiniBatchDeduplicateFunction.miniBatchDeduplicateOnRowTime((ValueState<RowData>)this.state, bufferedRows, out, this.generateUpdateBefore, this.generateInsert, this.rowtimeIndex, this.keepLastRow);
        }
    }

    private static void miniBatchDeduplicateOnRowTime(ValueState<RowData> state, List<RowData> bufferedRows, Collector<RowData> out, boolean generateUpdateBefore, boolean generateInsert, int rowtimeIndex, boolean keepLastRow) throws Exception {
        if (bufferedRows.isEmpty()) {
            return;
        }
        RowData preRow = (RowData)state.value();
        for (RowData currentRow2 : bufferedRows) {
            DeduplicateFunctionHelper.checkInsertOnly(currentRow2);
            if (!DeduplicateFunctionHelper.isDuplicate(preRow, currentRow2, rowtimeIndex, keepLastRow)) continue;
            DeduplicateFunctionHelper.updateDeduplicateResult(generateUpdateBefore, generateInsert, preRow, currentRow2, out);
            preRow = currentRow2;
        }
        state.update((Object)preRow);
    }
}

