/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTopNFunction
extends KeyedProcessFunctionWithCleanupState<BaseRow, BaseRow, BaseRow> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTopNFunction.class);
    private static final String RANK_UNSUPPORTED_MSG = "RANK() on streaming table is not supported currently";
    private static final String DENSE_RANK_UNSUPPORTED_MSG = "DENSE_RANK() on streaming table is not supported currently";
    private static final String WITHOUT_RANK_END_UNSUPPORTED_MSG = "Rank end is not specified. Currently rank only support TopN, which means the rank end must be specified.";
    private static final long DEFAULT_TOPN_SIZE = 100L;
    private GeneratedRecordComparator generatedSortKeyComparator;
    protected Comparator<BaseRow> sortKeyComparator;
    private final boolean generateRetraction;
    protected final boolean outputRankNumber;
    protected final BaseRowTypeInfo inputRowType;
    protected final KeySelector<BaseRow, BaseRow> sortKeySelector;
    protected KeyContext keyContext;
    private final boolean isConstantRankEnd;
    private final long rankStart;
    private final int rankEndIndex;
    protected long rankEnd;
    private transient Function<BaseRow, Long> rankEndFetcher;
    private ValueState<Long> rankEndState;
    private Counter invalidCounter;
    private JoinedRow outputRow;
    protected long hitCount = 0L;
    protected long requestCount = 0L;

    AbstractTopNFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo inputRowType, GeneratedRecordComparator generatedSortKeyComparator, BaseRowKeySelector sortKeySelector, RankType rankType, RankRange rankRange, boolean generateRetraction, boolean outputRankNumber) {
        super(minRetentionTime, maxRetentionTime);
        switch (rankType) {
            case ROW_NUMBER: {
                break;
            }
            case RANK: {
                LOG.error(RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(RANK_UNSUPPORTED_MSG);
            }
            case DENSE_RANK: {
                LOG.error(DENSE_RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(DENSE_RANK_UNSUPPORTED_MSG);
            }
            default: {
                LOG.error("Streaming tables do not support {}", (Object)rankType.name());
                throw new UnsupportedOperationException("Streaming tables do not support " + rankType.toString());
            }
        }
        if (rankRange instanceof ConstantRankRange) {
            ConstantRankRange constantRankRange = (ConstantRankRange)rankRange;
            this.isConstantRankEnd = true;
            this.rankStart = constantRankRange.getRankStart();
            this.rankEnd = constantRankRange.getRankEnd();
            this.rankEndIndex = -1;
        } else if (rankRange instanceof VariableRankRange) {
            VariableRankRange variableRankRange = (VariableRankRange)rankRange;
            this.rankEndIndex = variableRankRange.getRankEndIndex();
            this.isConstantRankEnd = false;
            this.rankStart = -1L;
            this.rankEnd = -1L;
        } else {
            LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG);
            throw new UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG);
        }
        this.generatedSortKeyComparator = generatedSortKeyComparator;
        this.generateRetraction = generateRetraction;
        this.inputRowType = inputRowType;
        this.outputRankNumber = outputRankNumber;
        this.sortKeySelector = sortKeySelector;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.initCleanupTimeState("RankFunctionCleanupTime");
        this.outputRow = new JoinedRow();
        if (!this.isConstantRankEnd) {
            ValueStateDescriptor rankStateDesc = new ValueStateDescriptor("rankEnd", Types.LONG);
            this.rankEndState = this.getRuntimeContext().getState(rankStateDesc);
        }
        this.sortKeyComparator = (Comparator)this.generatedSortKeyComparator.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.generatedSortKeyComparator = null;
        this.invalidCounter = this.getRuntimeContext().getMetricGroup().counter("topn.invalidTopSize");
        if (!this.isConstantRankEnd) {
            LogicalType rankEndIdxType = this.inputRowType.getLogicalTypes()[this.rankEndIndex];
            switch (rankEndIdxType.getTypeRoot()) {
                case BIGINT: {
                    this.rankEndFetcher = row -> row.getLong(this.rankEndIndex);
                    break;
                }
                case INTEGER: {
                    this.rankEndFetcher = row -> row.getInt(this.rankEndIndex);
                    break;
                }
                case SMALLINT: {
                    this.rankEndFetcher = row -> row.getShort(this.rankEndIndex);
                    break;
                }
                default: {
                    LOG.error("variable rank index column must be long, short or int type, while input type is {}", (Object)rankEndIdxType.getClass().getName());
                    throw new UnsupportedOperationException("variable rank index column must be long type, while input type is " + rankEndIdxType.getClass().getName());
                }
            }
        }
    }

    protected long getDefaultTopNSize() {
        return this.isConstantRankEnd ? this.rankEnd : 100L;
    }

    protected long initRankEnd(BaseRow row) throws Exception {
        if (this.isConstantRankEnd) {
            return this.rankEnd;
        }
        Long rankEndValue = (Long)this.rankEndState.value();
        long curRankEnd = this.rankEndFetcher.apply(row);
        if (rankEndValue == null) {
            this.rankEnd = curRankEnd;
            this.rankEndState.update((Object)this.rankEnd);
            return this.rankEnd;
        }
        this.rankEnd = rankEndValue;
        if (this.rankEnd != curRankEnd) {
            this.invalidCounter.inc();
        }
        return this.rankEnd;
    }

    protected boolean checkSortKeyInBufferRange(BaseRow sortKey, TopNBuffer buffer) {
        Comparator<BaseRow> comparator = buffer.getSortKeyComparator();
        Map.Entry<BaseRow, Collection<BaseRow>> worstEntry = buffer.lastEntry();
        if (worstEntry == null) {
            return true;
        }
        BaseRow worstKey = worstEntry.getKey();
        int compare = comparator.compare(sortKey, worstKey);
        if (compare < 0) {
            return true;
        }
        return (long)buffer.getCurrentTopNum() < this.getDefaultTopNSize();
    }

    protected void registerMetric(long heapSize) {
        this.getRuntimeContext().getMetricGroup().gauge("topn.cache.hitRate", () -> this.requestCount == 0L ? 1.0 : Long.valueOf(this.hitCount).doubleValue() / (double)this.requestCount);
        this.getRuntimeContext().getMetricGroup().gauge("topn.cache.size", () -> heapSize);
    }

    protected void collect(Collector<BaseRow> out, BaseRow inputRow) {
        BaseRowUtil.setAccumulate(inputRow);
        out.collect((Object)inputRow);
    }

    protected void delete(Collector<BaseRow> out, BaseRow inputRow) {
        BaseRowUtil.setRetract(inputRow);
        out.collect((Object)inputRow);
    }

    protected void delete(Collector<BaseRow> out, BaseRow inputRow, long rank) {
        if (this.isInRankRange(rank)) {
            out.collect((Object)this.createOutputRow(inputRow, rank, (byte)1));
        }
    }

    protected void collect(Collector<BaseRow> out, BaseRow inputRow, long rank) {
        if (this.isInRankRange(rank)) {
            out.collect((Object)this.createOutputRow(inputRow, rank, (byte)0));
        }
    }

    protected void retract(Collector<BaseRow> out, BaseRow inputRow, long rank) {
        if (this.generateRetraction && this.isInRankRange(rank)) {
            out.collect((Object)this.createOutputRow(inputRow, rank, (byte)1));
        }
    }

    protected boolean isInRankEnd(long rank) {
        return rank <= this.rankEnd;
    }

    protected boolean isInRankRange(long rank) {
        return rank <= this.rankEnd && rank >= this.rankStart;
    }

    protected boolean hasOffset() {
        return this.rankStart > 1L;
    }

    private BaseRow createOutputRow(BaseRow inputRow, long rank, byte header) {
        if (this.outputRankNumber) {
            GenericRow rankRow = new GenericRow(1);
            rankRow.setField(0, rank);
            this.outputRow.replace(inputRow, rankRow);
            this.outputRow.setHeader(header);
            return this.outputRow;
        }
        inputRow.setHeader(header);
        return inputRow;
    }

    public void setKeyContext(KeyContext keyContext) {
        this.keyContext = keyContext;
    }
}

