/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.LRUMap;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UpdatableTopNFunction
extends AbstractTopNFunction
implements CheckpointedFunction {
    private static final long serialVersionUID = 6786508184355952780L;
    private static final Logger LOG = LoggerFactory.getLogger(UpdatableTopNFunction.class);
    private final BaseRowTypeInfo rowKeyType;
    private final long cacheSize;
    private transient MapState<BaseRow, Tuple2<BaseRow, Integer>> dataState;
    private transient TopNBuffer buffer;
    private transient Map<BaseRow, TopNBuffer> kvSortedMap;
    private transient Map<BaseRow, RankRow> rowKeyMap;
    private transient LRUMap<BaseRow, Map<BaseRow, RankRow>> kvRowKeyMap;
    private final TypeSerializer<BaseRow> inputRowSer;
    private final KeySelector<BaseRow, BaseRow> rowKeySelector;

    public UpdatableTopNFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo inputRowType, BaseRowKeySelector rowKeySelector, GeneratedRecordComparator generatedRecordComparator, BaseRowKeySelector sortKeySelector, RankType rankType, RankRange rankRange, boolean generateRetraction, boolean outputRankNumber, long cacheSize) {
        super(minRetentionTime, maxRetentionTime, inputRowType, generatedRecordComparator, sortKeySelector, rankType, rankRange, generateRetraction, outputRankNumber);
        this.rowKeyType = rowKeySelector.getProducedType();
        this.cacheSize = cacheSize;
        this.inputRowSer = inputRowType.createSerializer(new ExecutionConfig());
        this.rowKeySelector = rowKeySelector;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        int lruCacheSize = Math.max(1, (int)(this.cacheSize / this.getDefaultTopNSize()));
        this.kvSortedMap = new HashMap<BaseRow, TopNBuffer>(lruCacheSize);
        this.kvRowKeyMap = new LRUMap<BaseRow, Map<BaseRow, RankRow>>(lruCacheSize, new CacheRemovalListener());
        LOG.info("Top{} operator is using LRU caches key-size: {}", (Object)this.getDefaultTopNSize(), (Object)lruCacheSize);
        TupleTypeInfo valueTypeInfo = new TupleTypeInfo(new TypeInformation[]{this.inputRowType, Types.INT});
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("data-state-with-update", (TypeInformation)this.rowKeyType, (TypeInformation)valueTypeInfo);
        this.dataState = this.getRuntimeContext().getMapState(mapStateDescriptor);
        this.registerMetric((long)this.kvSortedMap.size() * this.getDefaultTopNSize());
    }

    public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<BaseRow> out) throws Exception {
        if (this.stateCleaningEnabled) {
            BaseRow partitionKey = (BaseRow)this.keyContext.getCurrentKey();
            this.kvRowKeyMap.remove(partitionKey);
            this.kvSortedMap.remove(partitionKey);
            this.cleanupState(new State[]{this.dataState});
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void processElement(BaseRow input, KeyedProcessFunction.Context context, Collector<BaseRow> out) throws Exception {
        long currentTime = context.timerService().currentProcessingTime();
        this.registerProcessingCleanupTimer(context, currentTime);
        this.initHeapStates();
        this.initRankEnd(input);
        if (this.outputRankNumber || this.hasOffset()) {
            this.processElementWithRowNumber(input, out);
        } else {
            this.processElementWithoutRowNumber(input, out);
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        for (Map.Entry entry : this.kvRowKeyMap.entrySet()) {
            BaseRow partitionKey = (BaseRow)entry.getKey();
            Map currentRowKeyMap = (Map)entry.getValue();
            this.keyContext.setCurrentKey((Object)partitionKey);
            this.flushBufferToState(currentRowKeyMap);
        }
    }

    private void initHeapStates() throws Exception {
        ++this.requestCount;
        BaseRow partitionKey = (BaseRow)this.keyContext.getCurrentKey();
        this.buffer = this.kvSortedMap.get(partitionKey);
        this.rowKeyMap = (Map)this.kvRowKeyMap.get(partitionKey);
        if (this.buffer == null) {
            this.buffer = new TopNBuffer(this.sortKeyComparator, LinkedHashSet::new);
            this.rowKeyMap = new HashMap<BaseRow, RankRow>();
            this.kvSortedMap.put(partitionKey, this.buffer);
            this.kvRowKeyMap.put(partitionKey, this.rowKeyMap);
            Iterator iter = this.dataState.iterator();
            if (iter != null) {
                HashMap<BaseRow, TreeMap<Integer, BaseRow>> tempSortedMap = new HashMap<BaseRow, TreeMap<Integer, BaseRow>>();
                while (iter.hasNext()) {
                    Map.Entry entry = (Map.Entry)iter.next();
                    BaseRow rowKey = (BaseRow)entry.getKey();
                    Tuple2 recordAndInnerRank = (Tuple2)entry.getValue();
                    BaseRow record = (BaseRow)recordAndInnerRank.f0;
                    Integer innerRank = (Integer)recordAndInnerRank.f1;
                    this.rowKeyMap.put(rowKey, new RankRow(record, innerRank, false));
                    BaseRow sortKey = (BaseRow)this.sortKeySelector.getKey((Object)record);
                    TreeMap<Integer, BaseRow> treeMap = (TreeMap<Integer, BaseRow>)tempSortedMap.get(sortKey);
                    if (treeMap == null) {
                        treeMap = new TreeMap<Integer, BaseRow>();
                        tempSortedMap.put(sortKey, treeMap);
                    }
                    treeMap.put(innerRank, rowKey);
                }
                for (Map.Entry entry : tempSortedMap.entrySet()) {
                    BaseRow sortKey = (BaseRow)entry.getKey();
                    TreeMap treeMap = (TreeMap)entry.getValue();
                    for (Map.Entry treeMapEntry : treeMap.entrySet()) {
                        Integer innerRank = (Integer)treeMapEntry.getKey();
                        BaseRow recordRowKey = (BaseRow)treeMapEntry.getValue();
                        int size = this.buffer.put(sortKey, recordRowKey);
                        if (innerRank == size) continue;
                        LOG.warn("Failed to build sorted map from state, this may result in wrong result. The sort key is {}, partition key is {}, treeMap is {}. The expected inner rank is {}, but current size is {}.", new Object[]{sortKey, partitionKey, treeMap, innerRank, size});
                    }
                }
            }
        } else {
            ++this.hitCount;
        }
    }

    private void processElementWithRowNumber(BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        BaseRow sortKey = (BaseRow)this.sortKeySelector.getKey((Object)inputRow);
        BaseRow rowKey = (BaseRow)this.rowKeySelector.getKey((Object)inputRow);
        if (this.rowKeyMap.containsKey(rowKey)) {
            RankRow oldRow = this.rowKeyMap.get(rowKey);
            BaseRow oldSortKey = (BaseRow)this.sortKeySelector.getKey((Object)oldRow.row);
            if (oldSortKey.equals(sortKey)) {
                Tuple2<Integer, Integer> rankAndInnerRank = this.rowNumber(sortKey, rowKey, this.buffer);
                int rank = (Integer)rankAndInnerRank.f0;
                int innerRank = (Integer)rankAndInnerRank.f1;
                this.rowKeyMap.put(rowKey, new RankRow((BaseRow)this.inputRowSer.copy((Object)inputRow), innerRank, true));
                this.retract(out, oldRow.row, rank);
                this.collect(out, inputRow, rank);
                return;
            }
            Tuple2<Integer, Integer> oldRankAndInnerRank = this.rowNumber(oldSortKey, rowKey, this.buffer);
            int oldRank = (Integer)oldRankAndInnerRank.f0;
            this.buffer.remove(oldSortKey, rowKey);
            int size = this.buffer.put(sortKey, rowKey);
            this.rowKeyMap.put(rowKey, new RankRow((BaseRow)this.inputRowSer.copy((Object)inputRow), size, true));
            this.updateInnerRank(oldSortKey);
            this.emitRecordsWithRowNumber(sortKey, inputRow, out, oldSortKey, oldRow, oldRank);
        } else if (this.checkSortKeyInBufferRange(sortKey, this.buffer)) {
            int size = this.buffer.put(sortKey, rowKey);
            this.rowKeyMap.put(rowKey, new RankRow((BaseRow)this.inputRowSer.copy((Object)inputRow), size, true));
            this.emitRecordsWithRowNumber(sortKey, inputRow, out);
        }
    }

    private Tuple2<Integer, Integer> rowNumber(BaseRow sortKey, BaseRow rowKey, TopNBuffer buffer) {
        Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iterator = buffer.entrySet().iterator();
        int curRank = 1;
        while (iterator.hasNext()) {
            Map.Entry<BaseRow, Collection<BaseRow>> entry = iterator.next();
            BaseRow curKey = entry.getKey();
            Collection<BaseRow> rowKeys = entry.getValue();
            if (curKey.equals(sortKey)) {
                Iterator<BaseRow> rowKeysIter = rowKeys.iterator();
                int innerRank = 1;
                while (rowKeysIter.hasNext()) {
                    if (rowKey.equals(rowKeysIter.next())) {
                        return Tuple2.of((Object)curRank, (Object)innerRank);
                    }
                    ++innerRank;
                    ++curRank;
                }
                continue;
            }
            curRank += rowKeys.size();
        }
        LOG.error("Failed to find the sortKey: {}, rowkey: {} in the buffer. This should never happen", (Object)sortKey, (Object)rowKey);
        throw new RuntimeException("Failed to find the sortKey, rowkey in the buffer. This should never happen");
    }

    private void emitRecordsWithRowNumber(BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        this.emitRecordsWithRowNumber(sortKey, inputRow, out, null, null, -1);
    }

    private void emitRecordsWithRowNumber(BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out, BaseRow oldSortKey, RankRow oldRow, int oldRank) throws Exception {
        Collection<BaseRow> rowKeys;
        int oldInnerRank = oldRow == null ? -1 : oldRow.innerRank;
        Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iterator = this.buffer.entrySet().iterator();
        int curRank = 0;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            Map.Entry<BaseRow, Collection<BaseRow>> entry = iterator.next();
            BaseRow curSortKey = entry.getKey();
            rowKeys = entry.getValue();
            if (!findsSortKey && curSortKey.equals(sortKey)) {
                curRank += rowKeys.size();
                if (oldRow != null) {
                    this.retract(out, oldRow.row, oldRank);
                }
                this.collect(out, inputRow, curRank);
                findsSortKey = true;
                continue;
            }
            if (findsSortKey) {
                if (oldSortKey == null) {
                    Iterator<BaseRow> rowKeyIter = rowKeys.iterator();
                    while (rowKeyIter.hasNext() && this.isInRankEnd(curRank)) {
                        BaseRow rowKey = rowKeyIter.next();
                        RankRow prevRow = this.rowKeyMap.get(rowKey);
                        this.retract(out, prevRow.row, ++curRank - 1);
                        this.collect(out, prevRow.row, curRank);
                    }
                    continue;
                }
                int compare = this.sortKeyComparator.compare(curSortKey, oldSortKey);
                if (compare <= 0) {
                    Iterator<BaseRow> rowKeyIter = rowKeys.iterator();
                    int curInnerRank = 0;
                    while (rowKeyIter.hasNext() && this.isInRankEnd(curRank)) {
                        ++curRank;
                        if (compare == 0 && ++curInnerRank >= oldInnerRank) {
                            return;
                        }
                        BaseRow rowKey = rowKeyIter.next();
                        RankRow prevRow = this.rowKeyMap.get(rowKey);
                        this.retract(out, prevRow.row, curRank - 1);
                        this.collect(out, prevRow.row, curRank);
                    }
                    continue;
                }
                return;
            }
            curRank += rowKeys.size();
        }
        ArrayList<BaseRow> toDeleteSortKeys = new ArrayList<BaseRow>();
        while (iterator.hasNext()) {
            Map.Entry<BaseRow, Collection<BaseRow>> entry = iterator.next();
            rowKeys = entry.getValue();
            for (BaseRow rowKey : rowKeys) {
                this.rowKeyMap.remove(rowKey);
                this.dataState.remove((Object)rowKey);
            }
            toDeleteSortKeys.add(entry.getKey());
        }
        for (BaseRow toDeleteKey : toDeleteSortKeys) {
            this.buffer.removeAll(toDeleteKey);
        }
    }

    private void processElementWithoutRowNumber(BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        BaseRow sortKey = (BaseRow)this.sortKeySelector.getKey((Object)inputRow);
        BaseRow rowKey = (BaseRow)this.rowKeySelector.getKey((Object)inputRow);
        if (this.rowKeyMap.containsKey(rowKey)) {
            RankRow oldRow = this.rowKeyMap.get(rowKey);
            BaseRow oldSortKey = (BaseRow)this.sortKeySelector.getKey((Object)oldRow.row);
            if (!oldSortKey.equals(sortKey)) {
                this.buffer.remove(oldSortKey, rowKey);
                int size = this.buffer.put(sortKey, rowKey);
                this.rowKeyMap.put(rowKey, new RankRow((BaseRow)this.inputRowSer.copy((Object)inputRow), size, true));
                this.updateInnerRank(oldSortKey);
            } else {
                this.rowKeyMap.put(rowKey, new RankRow((BaseRow)this.inputRowSer.copy((Object)inputRow), oldRow.innerRank, true));
            }
            this.retract(out, oldRow.row, oldRow.innerRank);
            this.collect(out, inputRow);
        } else if (this.checkSortKeyInBufferRange(sortKey, this.buffer)) {
            BaseRow lastRowKey;
            int size = this.buffer.put(sortKey, rowKey);
            this.rowKeyMap.put(rowKey, new RankRow((BaseRow)this.inputRowSer.copy((Object)inputRow), size, true));
            this.collect(out, inputRow);
            if ((long)this.buffer.getCurrentTopNum() > this.rankEnd && (lastRowKey = this.buffer.removeLast()) != null) {
                RankRow lastRow = this.rowKeyMap.remove(lastRowKey);
                this.dataState.remove((Object)lastRowKey);
                this.delete(out, lastRow.row);
            }
        }
    }

    private void flushBufferToState(Map<BaseRow, RankRow> curRowKeyMap) throws Exception {
        for (Map.Entry<BaseRow, RankRow> entry : curRowKeyMap.entrySet()) {
            BaseRow key = entry.getKey();
            RankRow rankRow = entry.getValue();
            if (!rankRow.dirty) continue;
            this.dataState.put((Object)key, (Object)Tuple2.of((Object)rankRow.row, (Object)rankRow.innerRank));
            rankRow.dirty = false;
        }
    }

    private void updateInnerRank(BaseRow oldSortKey) {
        Collection<BaseRow> list = this.buffer.get(oldSortKey);
        if (list != null) {
            Iterator<BaseRow> iter = list.iterator();
            int innerRank = 1;
            while (iter.hasNext()) {
                BaseRow rowKey = iter.next();
                RankRow row = this.rowKeyMap.get(rowKey);
                if (row.innerRank != innerRank) {
                    row.innerRank = innerRank;
                    row.dirty = true;
                }
                ++innerRank;
            }
        }
    }

    private class RankRow {
        private final BaseRow row;
        private int innerRank;
        private boolean dirty;

        private RankRow(BaseRow row, int innerRank, boolean dirty) {
            this.row = row;
            this.innerRank = innerRank;
            this.dirty = dirty;
        }
    }

    private class CacheRemovalListener
    implements LRUMap.RemovalListener<BaseRow, Map<BaseRow, RankRow>> {
        private CacheRemovalListener() {
        }

        @Override
        public void onRemoval(Map.Entry<BaseRow, Map<BaseRow, RankRow>> eldest) {
            BaseRow previousKey = (BaseRow)UpdatableTopNFunction.this.keyContext.getCurrentKey();
            BaseRow partitionKey = eldest.getKey();
            Map<BaseRow, RankRow> currentRowKeyMap = eldest.getValue();
            UpdatableTopNFunction.this.keyContext.setCurrentKey((Object)partitionKey);
            UpdatableTopNFunction.this.kvSortedMap.remove(partitionKey);
            try {
                UpdatableTopNFunction.this.flushBufferToState(currentRowKeyMap);
            }
            catch (Throwable e) {
                LOG.error("Fail to synchronize state!", e);
                throw new RuntimeException(e);
            }
            finally {
                UpdatableTopNFunction.this.keyContext.setCurrentKey((Object)previousKey);
            }
        }
    }
}

