/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.iterator;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

public class RocksStatesPerKeyGroupMergeIterator
implements AutoCloseable {
    private final PriorityQueue<RocksSingleStateIterator> heap;
    private final int keyGroupPrefixByteCount;
    private boolean newKeyGroup;
    private boolean newKVState;
    private boolean valid;
    private RocksSingleStateIterator currentSubIterator;
    private static final List<Comparator<RocksSingleStateIterator>> COMPARATORS;

    public RocksStatesPerKeyGroupMergeIterator(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, int keyGroupPrefixByteCount) {
        Preconditions.checkNotNull(kvStateIterators);
        Preconditions.checkArgument((keyGroupPrefixByteCount >= 1 ? 1 : 0) != 0);
        this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
        if (kvStateIterators.size() > 0) {
            this.heap = this.buildIteratorHeap(kvStateIterators);
            this.valid = !this.heap.isEmpty();
            this.currentSubIterator = this.heap.poll();
            kvStateIterators.clear();
        } else {
            this.heap = null;
            this.valid = false;
        }
        this.newKeyGroup = true;
        this.newKVState = true;
    }

    public void next() {
        this.newKeyGroup = false;
        this.newKVState = false;
        RocksIteratorWrapper rocksIterator = this.currentSubIterator.getIterator();
        rocksIterator.next();
        byte[] oldKey = this.currentSubIterator.getCurrentKey();
        if (rocksIterator.isValid()) {
            this.currentSubIterator.setCurrentKey(rocksIterator.key());
            if (this.isDifferentKeyGroup(oldKey, this.currentSubIterator.getCurrentKey())) {
                this.heap.offer(this.currentSubIterator);
                this.currentSubIterator = (RocksSingleStateIterator)this.heap.remove();
                this.newKVState = this.currentSubIterator.getIterator() != rocksIterator;
                this.detectNewKeyGroup(oldKey);
            }
        } else {
            IOUtils.closeQuietly((AutoCloseable)rocksIterator);
            if (this.heap.isEmpty()) {
                this.currentSubIterator = null;
                this.valid = false;
            } else {
                this.currentSubIterator = (RocksSingleStateIterator)this.heap.remove();
                this.newKVState = true;
                this.detectNewKeyGroup(oldKey);
            }
        }
    }

    private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) {
        Comparator<RocksSingleStateIterator> iteratorComparator = COMPARATORS.get(this.keyGroupPrefixByteCount - 1);
        PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue = new PriorityQueue<RocksSingleStateIterator>(kvStateIterators.size(), iteratorComparator);
        for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
            RocksIteratorWrapper rocksIterator = (RocksIteratorWrapper)rocksIteratorWithKVStateId.f0;
            rocksIterator.seekToFirst();
            if (rocksIterator.isValid()) {
                iteratorPriorityQueue.offer(new RocksSingleStateIterator(rocksIterator, (Integer)rocksIteratorWithKVStateId.f1));
                continue;
            }
            IOUtils.closeQuietly((AutoCloseable)rocksIterator);
        }
        return iteratorPriorityQueue;
    }

    private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
        return 0 != RocksStatesPerKeyGroupMergeIterator.compareKeyGroupsForByteArrays(a, b, this.keyGroupPrefixByteCount);
    }

    private void detectNewKeyGroup(byte[] oldKey) {
        if (this.isDifferentKeyGroup(oldKey, this.currentSubIterator.getCurrentKey())) {
            this.newKeyGroup = true;
        }
    }

    public int keyGroup() {
        byte[] currentKey = this.currentSubIterator.getCurrentKey();
        int result = 0;
        for (int i = 0; i < this.keyGroupPrefixByteCount; ++i) {
            result <<= 8;
            result |= currentKey[i] & 0xFF;
        }
        return result;
    }

    public byte[] key() {
        return this.currentSubIterator.getCurrentKey();
    }

    public byte[] value() {
        return this.currentSubIterator.getIterator().value();
    }

    public int kvStateId() {
        return this.currentSubIterator.getKvStateId();
    }

    public boolean isNewKeyValueState() {
        return this.newKVState;
    }

    public boolean isNewKeyGroup() {
        return this.newKeyGroup;
    }

    public boolean isValid() {
        return this.valid;
    }

    private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
        for (int i = 0; i < len; ++i) {
            int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
            if (diff == 0) continue;
            return diff;
        }
        return 0;
    }

    @Override
    public void close() {
        IOUtils.closeQuietly((AutoCloseable)this.currentSubIterator);
        this.currentSubIterator = null;
        IOUtils.closeAllQuietly(this.heap);
        this.heap.clear();
    }

    static {
        int maxBytes = 2;
        COMPARATORS = new ArrayList<Comparator<RocksSingleStateIterator>>(maxBytes);
        for (int i = 0; i < maxBytes; ++i) {
            int currentBytes = i + 1;
            COMPARATORS.add((o1, o2) -> {
                int arrayCmpRes = RocksStatesPerKeyGroupMergeIterator.compareKeyGroupsForByteArrays(o1.getCurrentKey(), o2.getCurrentKey(), currentBytes);
                return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
            });
        }
    }
}

