package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.class */
public class MiniBatchDeduplicateKeepLastRowFunction extends MapBundleFunction<RowData, RowData, RowData, RowData> {
    private static final long serialVersionUID = -8981813609115029119L;
    private final RowDataTypeInfo rowTypeInfo;
    private final boolean generateUpdateBefore;
    private final boolean generateInsert;
    private final TypeSerializer<RowData> typeSerializer;
    private final long minRetentionTime;
    private ValueState<RowData> state;

    public MiniBatchDeduplicateKeepLastRowFunction(RowDataTypeInfo rowDataTypeInfo, boolean z, boolean z2, TypeSerializer<RowData> typeSerializer, long j) {
        this.minRetentionTime = j;
        this.rowTypeInfo = rowDataTypeInfo;
        this.generateUpdateBefore = z;
        this.generateInsert = z2;
        this.typeSerializer = typeSerializer;
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void open(ExecutionContext executionContext) throws Exception {
        super.open(executionContext);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("preRowState", this.rowTypeInfo);
        StateTtlConfig createTtlConfig = StateTtlConfigUtil.createTtlConfig(this.minRetentionTime);
        if (createTtlConfig.isEnabled()) {
            valueStateDescriptor.enableTimeToLive(createTtlConfig);
        }
        this.state = executionContext.getRuntimeContext().getState(valueStateDescriptor);
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public RowData addInput(@Nullable RowData rowData, RowData rowData2) {
        return (RowData) this.typeSerializer.copy(rowData2);
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void finishBundle(Map<RowData, RowData> map, Collector<RowData> collector) throws Exception {
        for (Map.Entry<RowData, RowData> entry : map.entrySet()) {
            RowData key = entry.getKey();
            RowData value = entry.getValue();
            this.ctx.setCurrentKey(key);
            DeduplicateFunctionHelper.processLastRow(value, this.generateUpdateBefore, this.generateInsert, this.state, collector);
        }
    }
}
