/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StateSerdes;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class RocksDBStore<K, V>
implements KeyValueStore<K, V> {
    private static final int TTL_NOT_USED = -1;
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 0x2000000L;
    private static final long BLOCK_CACHE_SIZE = 0x6400000L;
    private static final long BLOCK_SIZE = 4096L;
    private static final int TTL_SECONDS = -1;
    private static final int MAX_WRITE_BUFFERS = 3;
    private static final String DB_FILE_DIR = "rocksdb";
    private final String name;
    private final String parentDir;
    private final Set<KeyValueIterator> openIterators = new HashSet<KeyValueIterator>();
    File dbDir;
    private StateSerdes<K, V> serdes;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private RocksDB db;
    private Options options;
    private WriteOptions wOptions;
    private FlushOptions fOptions;
    protected volatile boolean open = false;

    RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
        this(name, DB_FILE_DIR, keySerde, valueSerde);
    }

    RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) {
        this.name = name;
        this.parentDir = parentDir;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    public void openDB(ProcessorContext context) {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(0x6400000L);
        tableConfig.setBlockSize(4096L);
        this.options = new Options();
        this.options.setTableFormatConfig((TableFormatConfig)tableConfig);
        this.options.setWriteBufferSize(0x2000000L);
        this.options.setCompressionType(COMPRESSION_TYPE);
        this.options.setCompactionStyle(COMPACTION_STYLE);
        this.options.setMaxWriteBufferNumber(3);
        this.options.setCreateIfMissing(true);
        this.options.setErrorIfExists(false);
        this.options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
        this.options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
        Map<String, Object> configs = context.appConfigs();
        Class configSetterClass = (Class)configs.get("rocksdb.config.setter");
        if (configSetterClass != null) {
            RocksDBConfigSetter configSetter = (RocksDBConfigSetter)Utils.newInstance((Class)configSetterClass);
            configSetter.setConfig(this.name, this.options, configs);
        }
        this.serdes = new StateSerdes(ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.name), (Serde<?>)(this.keySerde == null ? context.keySerde() : this.keySerde), (Serde<?>)(this.valueSerde == null ? context.valueSerde() : this.valueSerde));
        this.dbDir = new File(new File(context.stateDir(), this.parentDir), this.name);
        this.db = this.openDB(this.dbDir, this.options, -1);
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.openDB(context);
        context.register(root, false, new StateRestoreCallback(){

            @Override
            public void restore(byte[] key, byte[] value) {
                RocksDBStore.this.putInternal(key, value);
            }
        });
        this.open = true;
    }

    private RocksDB openDB(File dir, Options options, int ttl) {
        try {
            if (ttl == -1) {
                dir.getParentFile().mkdirs();
                return RocksDB.open((Options)options, (String)dir.getAbsolutePath());
            }
            throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
        }
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public synchronized V get(K key) {
        this.validateStoreOpen();
        byte[] byteValue = this.getInternal(this.serdes.rawKey(key));
        if (byteValue == null) {
            return null;
        }
        return this.serdes.valueFrom(byteValue);
    }

    private void validateStoreOpen() {
        if (!this.open) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        }
    }

    private byte[] getInternal(byte[] rawKey) {
        try {
            return this.db.get(rawKey);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key " + this.serdes.keyFrom(rawKey) + " from store " + this.name, e);
        }
    }

    @Override
    public synchronized void put(K key, V value) {
        this.validateStoreOpen();
        byte[] rawKey = this.serdes.rawKey(key);
        byte[] rawValue = this.serdes.rawValue(value);
        this.putInternal(rawKey, rawValue);
    }

    @Override
    public synchronized V putIfAbsent(K key, V value) {
        V originalValue = this.get(key);
        if (originalValue == null) {
            this.put(key, value);
        }
        return originalValue;
    }

    private void putInternal(byte[] rawKey, byte[] rawValue) {
        if (rawValue == null) {
            try {
                this.db.delete(this.wOptions, rawKey);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key " + this.serdes.keyFrom(rawKey) + " from store " + this.name, e);
            }
        }
        try {
            this.db.put(this.wOptions, rawKey, rawValue);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing put key " + this.serdes.keyFrom(rawKey) + " and value " + this.serdes.keyFrom(rawValue) + " from store " + this.name, e);
        }
    }

    @Override
    public void putAll(List<KeyValue<K, V>> entries) {
        try (WriteBatch batch = new WriteBatch();){
            for (KeyValue<K, V> entry : entries) {
                byte[] rawKey = this.serdes.rawKey(entry.key);
                if (entry.value == null) {
                    this.db.delete(rawKey);
                    continue;
                }
                byte[] value = this.serdes.rawValue(entry.value);
                batch.put(rawKey, value);
            }
            this.db.write(this.wOptions, batch);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
        }
    }

    @Override
    public synchronized V delete(K key) {
        V value = this.get(key);
        this.put(key, null);
        return value;
    }

    @Override
    public synchronized KeyValueIterator<K, V> range(K from, K to) {
        this.validateStoreOpen();
        RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(this.db.newIterator(), this.serdes, from, to);
        this.openIterators.add(rocksDBRangeIterator);
        return rocksDBRangeIterator;
    }

    @Override
    public synchronized KeyValueIterator<K, V> all() {
        this.validateStoreOpen();
        RocksIterator innerIter = this.db.newIterator();
        innerIter.seekToFirst();
        RocksDbIterator rocksDbIterator = new RocksDbIterator(innerIter, this.serdes);
        this.openIterators.add(rocksDbIterator);
        return rocksDbIterator;
    }

    @Override
    public long approximateNumEntries() {
        long value;
        try {
            value = this.db.getLongProperty("rocksdb.estimate-num-keys");
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
        }
        if (this.isOverflowing(value)) {
            return Long.MAX_VALUE;
        }
        return value;
    }

    private boolean isOverflowing(long value) {
        return value < 0L;
    }

    @Override
    public synchronized void flush() {
        if (this.db == null) {
            return;
        }
        this.flushInternal();
    }

    private void flushInternal() {
        try {
            this.db.flush(this.fOptions);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    @Override
    public synchronized void close() {
        if (!this.open) {
            return;
        }
        this.open = false;
        this.closeOpenIterators();
        this.options.close();
        this.wOptions.close();
        this.fOptions.close();
        this.db.close();
        this.options = null;
        this.wOptions = null;
        this.fOptions = null;
        this.db = null;
    }

    private void closeOpenIterators() {
        for (KeyValueIterator iterator : new HashSet<KeyValueIterator>(this.openIterators)) {
            iterator.close();
        }
        this.openIterators.clear();
    }

    private class RocksDBRangeIterator
    extends RocksDbIterator {
        private final Comparator<byte[]> comparator;
        private byte[] rawToKey;

        RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
            super(iter, serdes);
            this.comparator = Bytes.BYTES_LEXICO_COMPARATOR;
            iter.seek(serdes.rawKey(from));
            this.rawToKey = serdes.rawKey(to);
        }

        @Override
        public synchronized boolean hasNext() {
            return super.hasNext() && this.comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
        }
    }

    class RocksDbIterator
    implements KeyValueIterator<K, V> {
        private final RocksIterator iter;
        private final StateSerdes<K, V> serdes;
        private boolean open = true;

        RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) {
            this.iter = iter;
            this.serdes = serdes;
        }

        byte[] peekRawKey() {
            return this.iter.key();
        }

        private KeyValue<K, V> getKeyValue() {
            return new KeyValue(this.serdes.keyFrom(this.iter.key()), this.serdes.valueFrom(this.iter.value()));
        }

        @Override
        public synchronized boolean hasNext() {
            if (!this.open) {
                throw new InvalidStateStoreException("store %s has closed");
            }
            return this.iter.isValid();
        }

        @Override
        public synchronized KeyValue<K, V> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue entry = this.getKeyValue();
            this.iter.next();
            return entry;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
        }

        @Override
        public synchronized void close() {
            this.open = false;
            RocksDBStore.this.openIterators.remove(this);
            this.iter.close();
        }

        @Override
        public K peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.serdes.keyFrom(this.iter.key());
        }
    }
}

