/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDBIncrementalCheckpointUtils {
    private static final double OVERLAP_FRACTION_THRESHOLD = 0.75;
    private static final BiFunction<KeyedStateHandle, KeyGroupRange, Double> STATE_HANDLE_EVALUATOR = (stateHandle, targetKeyGroupRange) -> {
        KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
        KeyGroupRange intersectGroup = handleKeyGroupRange.getIntersection(targetKeyGroupRange);
        double overlapFraction = (double)intersectGroup.getNumberOfKeyGroups() / (double)handleKeyGroupRange.getNumberOfKeyGroups();
        if (overlapFraction < 0.75) {
            return -1.0;
        }
        return (double)intersectGroup.getNumberOfKeyGroups() * overlapFraction * overlapFraction;
    };

    public static void clipDBWithKeyGroupRange(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull KeyGroupRange targetKeyGroupRange, @Nonnull KeyGroupRange currentKeyGroupRange, @Nonnegative int keyGroupPrefixBytes) throws RocksDBException {
        byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
        if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
            RocksDBKeySerializationUtils.serializeKeyGroup(currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
            RocksDBKeySerializationUtils.serializeKeyGroup(targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
            RocksDBIncrementalCheckpointUtils.deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }
        if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
            RocksDBKeySerializationUtils.serializeKeyGroup(targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
            RocksDBKeySerializationUtils.serializeKeyGroup(currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
            RocksDBIncrementalCheckpointUtils.deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }
    }

    private static void deleteRange(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, byte[] beginKeyBytes, byte[] endKeyBytes) throws RocksDBException {
        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
            RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
            Throwable throwable = null;
            try {
                RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db);
                Throwable throwable2 = null;
                try {
                    byte[] currentKey;
                    iteratorWrapper.seek(beginKeyBytes);
                    while (iteratorWrapper.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(currentKey = iteratorWrapper.key(), endKeyBytes)) {
                        writeBatchWrapper.remove(columnFamilyHandle, currentKey);
                        iteratorWrapper.next();
                    }
                }
                catch (Throwable throwable3) {
                    throwable2 = throwable3;
                    throw throwable3;
                }
                finally {
                    if (writeBatchWrapper == null) continue;
                    if (throwable2 != null) {
                        try {
                            writeBatchWrapper.close();
                        }
                        catch (Throwable throwable4) {
                            throwable2.addSuppressed(throwable4);
                        }
                        continue;
                    }
                    writeBatchWrapper.close();
                }
            }
            catch (Throwable throwable5) {
                throwable = throwable5;
                throw throwable5;
            }
            finally {
                if (iteratorWrapper == null) continue;
                if (throwable != null) {
                    try {
                        iteratorWrapper.close();
                    }
                    catch (Throwable throwable6) {
                        throwable.addSuppressed(throwable6);
                    }
                    continue;
                }
                iteratorWrapper.close();
            }
        }
    }

    public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) {
        int prefixLength = prefixBytes.length;
        for (int i = 0; i < prefixLength; ++i) {
            int r = (char)prefixBytes[i] - (char)bytes[i];
            if (r == 0) continue;
            return r > 0;
        }
        return false;
    }

    @Nullable
    public static KeyedStateHandle chooseTheBestStateHandleForInitial(@Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange) {
        KeyedStateHandle bestStateHandle = null;
        double bestScore = 0.0;
        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
            double handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle, targetKeyGroupRange);
            if (!(handleScore > bestScore)) continue;
            bestStateHandle = rawStateHandle;
            bestScore = handleScore;
        }
        return bestStateHandle;
    }
}

