/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.contrib.streaming.state.ConfigurableRocksDBOptionsFactory;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
import org.apache.flink.contrib.streaming.state.RocksDBMemoryConfiguration;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory;
import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer;
import org.apache.flink.contrib.streaming.state.RocksDBSharedResources;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractManagedMemoryStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class EmbeddedRocksDBStateBackend
extends AbstractManagedMemoryStateBackend
implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedRocksDBStateBackend.class);
    private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
    private static boolean rocksDbInitialized = false;
    private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1;
    private static final long UNDEFINED_WRITE_BATCH_SIZE = -1L;
    @Nullable
    private File[] localRocksDbDirectories;
    @Nullable
    private PredefinedOptions predefinedOptions;
    @Nullable
    private RocksDBOptionsFactory rocksDbOptionsFactory;
    private final TernaryBoolean enableIncrementalCheckpointing;
    private int numberOfTransferThreads;
    private final RocksDBMemoryConfiguration memoryConfiguration;
    @Nullable
    private PriorityQueueStateType priorityQueueStateType;
    private final RocksDBNativeMetricOptions defaultMetricOptions;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;
    private long writeBatchSize;

    public EmbeddedRocksDBStateBackend() {
        this(TernaryBoolean.UNDEFINED);
    }

    public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing) {
        this(TernaryBoolean.fromBoolean((boolean)enableIncrementalCheckpointing));
    }

    public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing) {
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        this.numberOfTransferThreads = -1;
        this.defaultMetricOptions = new RocksDBNativeMetricOptions();
        this.memoryConfiguration = new RocksDBMemoryConfiguration();
        this.writeBatchSize = -1L;
    }

    private EmbeddedRocksDBStateBackend(EmbeddedRocksDBStateBackend original, ReadableConfig config, ClassLoader classLoader) {
        this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(((Boolean)config.get(CheckpointingOptions.INCREMENTAL_CHECKPOINTS)).booleanValue());
        this.numberOfTransferThreads = original.numberOfTransferThreads == -1 ? (Integer)config.get(RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM) : original.numberOfTransferThreads;
        this.writeBatchSize = original.writeBatchSize == -1L ? ((MemorySize)config.get(RocksDBConfigurableOptions.WRITE_BATCH_SIZE)).getBytes() : original.writeBatchSize;
        this.memoryConfiguration = RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration, config);
        this.memoryConfiguration.validate();
        this.priorityQueueStateType = null == original.priorityQueueStateType ? (PriorityQueueStateType)((Object)config.get(RocksDBOptions.TIMER_SERVICE_FACTORY)) : original.priorityQueueStateType;
        if (original.localRocksDbDirectories != null) {
            this.localRocksDbDirectories = original.localRocksDbDirectories;
        } else {
            String rocksdbLocalPaths = (String)config.get(RocksDBOptions.LOCAL_DIRECTORIES);
            if (rocksdbLocalPaths != null) {
                String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
                try {
                    this.setDbStoragePaths(directories);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Invalid configuration for RocksDB state backend's local storage directories: " + e.getMessage(), (Throwable)e);
                }
            }
        }
        this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);
        this.predefinedOptions = original.predefinedOptions == null ? PredefinedOptions.valueOf((String)config.get(RocksDBOptions.PREDEFINED_OPTIONS)) : original.predefinedOptions;
        LOG.info("Using predefined options: {}.", (Object)this.predefinedOptions.name());
        try {
            this.rocksDbOptionsFactory = this.configureOptionsFactory(original.rocksDbOptionsFactory, (String)config.get(RocksDBOptions.OPTIONS_FACTORY), config, classLoader);
        }
        catch (DynamicCodeLoadingException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        this.latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config);
    }

    public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
        return new EmbeddedRocksDBStateBackend(this, config, classLoader);
    }

    private void lazyInitializeForJob(Environment env, String operatorIdentifier) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = env.getJobID();
        if (this.localRocksDbDirectories == null) {
            this.initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
        } else {
            ArrayList<File> dirs = new ArrayList<File>(this.localRocksDbDirectories.length);
            StringBuilder errorMessage = new StringBuilder();
            for (File f : this.localRocksDbDirectories) {
                File testDir = new File(f, UUID.randomUUID().toString());
                if (!testDir.mkdirs()) {
                    String msg = "Local DB files directory '" + f + "' does not exist and cannot be created. ";
                    LOG.error(msg);
                    errorMessage.append(msg);
                } else {
                    dirs.add(f);
                }
                testDir.delete();
            }
            if (dirs.isEmpty()) {
                throw new IOException("No local storage directories available. " + errorMessage);
            }
            this.initializedDbBasePaths = dirs.toArray(new File[0]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int ni = this.nextDirectory + 1;
        this.nextDirectory = ni = ni >= this.initializedDbBasePaths.length ? 0 : ni;
        return this.initializedDbBasePaths[ni];
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws IOException {
        return this.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, metricGroup, stateHandles, cancelStreamRegistry, 1.0);
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry, double managedMemoryFraction) throws IOException {
        String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
        EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDir);
        String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
        this.lazyInitializeForJob(env, fileCompatibleIdentifier);
        File instanceBasePath = new File(this.getNextStoragePath(), "job_" + this.jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" + UUID.randomUUID());
        LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig();
        OpaqueMemoryResource<RocksDBSharedResources> sharedResources = RocksDBOperationUtils.allocateSharedCachesIfConfigured(this.memoryConfiguration, env.getMemoryManager(), managedMemoryFraction, LOG);
        if (sharedResources != null) {
            LOG.info("Obtained shared RocksDB cache of size {} bytes", (Object)sharedResources.getSize());
        }
        RocksDBResourceContainer resourceContainer = this.createOptionsAndResourceContainer(sharedResources);
        ExecutionConfig executionConfig = env.getExecutionConfig();
        StreamCompressionDecorator keyGroupCompressionDecorator = EmbeddedRocksDBStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig);
        LatencyTrackingStateConfig latencyTrackingStateConfig = this.latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build();
        RocksDBKeyedStateBackendBuilder<K> builder = new RocksDBKeyedStateBackendBuilder<K>(operatorIdentifier, env.getUserCodeClassLoader().asClassLoader(), instanceBasePath, resourceContainer, stateName -> resourceContainer.getColumnOptions(), kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, executionConfig, localRecoveryConfig, this.getPriorityQueueStateType(), ttlTimeProvider, latencyTrackingStateConfig, metricGroup, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry).setEnableIncrementalCheckpointing(this.isIncrementalCheckpointsEnabled()).setNumberOfTransferingThreads(this.getNumberOfTransferThreads()).setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(this.defaultMetricOptions)).setWriteBatchSize(this.getWriteBatchSize());
        return builder.build();
    }

    public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
        boolean asyncSnapshots = true;
        return new DefaultOperatorStateBackendBuilder(env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), true, stateHandles, cancelStreamRegistry).build();
    }

    private RocksDBOptionsFactory configureOptionsFactory(@Nullable RocksDBOptionsFactory originalOptionsFactory, String factoryClassName, ReadableConfig config, ClassLoader classLoader) throws DynamicCodeLoadingException {
        if (originalOptionsFactory != null) {
            if (originalOptionsFactory instanceof ConfigurableRocksDBOptionsFactory) {
                originalOptionsFactory = ((ConfigurableRocksDBOptionsFactory)originalOptionsFactory).configure(config);
            }
            LOG.info("Using application-defined options factory: {}.", (Object)originalOptionsFactory);
            return originalOptionsFactory;
        }
        if (factoryClassName.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName())) {
            DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory();
            optionsFactory.configure(config);
            LOG.info("Using default options factory: {}.", (Object)optionsFactory);
            return optionsFactory;
        }
        try {
            Class<RocksDBOptionsFactory> clazz = Class.forName(factoryClassName, false, classLoader).asSubclass(RocksDBOptionsFactory.class);
            RocksDBOptionsFactory optionsFactory = clazz.newInstance();
            if (optionsFactory instanceof ConfigurableRocksDBOptionsFactory) {
                optionsFactory = ((ConfigurableRocksDBOptionsFactory)optionsFactory).configure(config);
            }
            LOG.info("Using configured options factory: {}.", (Object)optionsFactory);
            return optionsFactory;
        }
        catch (ClassNotFoundException e) {
            throw new DynamicCodeLoadingException("Cannot find configured options factory class: " + factoryClassName, (Throwable)e);
        }
        catch (ClassCastException | IllegalAccessException | InstantiationException e) {
            throw new DynamicCodeLoadingException("The class configured under '" + RocksDBOptions.OPTIONS_FACTORY.key() + "' is not a valid options factory (" + factoryClassName + ')', (Throwable)e);
        }
    }

    public RocksDBMemoryConfiguration getMemoryConfiguration() {
        return this.memoryConfiguration;
    }

    public void setDbStoragePath(String path) {
        String[] stringArray;
        if (path == null) {
            stringArray = null;
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = path;
        }
        this.setDbStoragePaths(stringArray);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void setDbStoragePaths(String ... paths) {
        if (paths == null) {
            this.localRocksDbDirectories = null;
            return;
        }
        if (paths.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] pp = new File[paths.length];
        for (int i = 0; i < paths.length; ++i) {
            String path;
            String rawPath = paths[i];
            if (rawPath == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(rawPath).toUri();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (uri != null && uri.getScheme() != null) {
                if (!"file".equalsIgnoreCase(uri.getScheme())) throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
                path = uri.getPath();
            } else {
                path = rawPath;
            }
            pp[i] = new File(path);
            if (pp[i].isAbsolute()) continue;
            throw new IllegalArgumentException("Relative paths are not supported");
        }
        this.localRocksDbDirectories = pp;
    }

    public String[] getDbStoragePaths() {
        if (this.localRocksDbDirectories == null) {
            return null;
        }
        String[] paths = new String[this.localRocksDbDirectories.length];
        for (int i = 0; i < paths.length; ++i) {
            paths[i] = this.localRocksDbDirectories[i].toString();
        }
        return paths;
    }

    public boolean isIncrementalCheckpointsEnabled() {
        return this.enableIncrementalCheckpointing.getOrDefault(((Boolean)CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()).booleanValue());
    }

    public PriorityQueueStateType getPriorityQueueStateType() {
        return this.priorityQueueStateType == null ? (PriorityQueueStateType)((Object)RocksDBOptions.TIMER_SERVICE_FACTORY.defaultValue()) : this.priorityQueueStateType;
    }

    public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) {
        this.priorityQueueStateType = (PriorityQueueStateType)((Object)Preconditions.checkNotNull((Object)((Object)priorityQueueStateType)));
    }

    public void setPredefinedOptions(@Nonnull PredefinedOptions options) {
        this.predefinedOptions = (PredefinedOptions)((Object)Preconditions.checkNotNull((Object)((Object)options)));
    }

    @VisibleForTesting
    public PredefinedOptions getPredefinedOptions() {
        if (this.predefinedOptions == null) {
            this.predefinedOptions = PredefinedOptions.DEFAULT;
        }
        return this.predefinedOptions;
    }

    public void setRocksDBOptions(RocksDBOptionsFactory optionsFactory) {
        this.rocksDbOptionsFactory = optionsFactory;
    }

    @Nullable
    public RocksDBOptionsFactory getRocksDBOptions() {
        return this.rocksDbOptionsFactory;
    }

    public int getNumberOfTransferThreads() {
        return this.numberOfTransferThreads == -1 ? (Integer)RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : this.numberOfTransferThreads;
    }

    public void setNumberOfTransferThreads(int numberOfTransferThreads) {
        Preconditions.checkArgument((numberOfTransferThreads > 0 ? 1 : 0) != 0, (Object)"The number of threads used to transfer files in EmbeddedRocksDBStateBackend should be greater than zero.");
        this.numberOfTransferThreads = numberOfTransferThreads;
    }

    public long getWriteBatchSize() {
        return this.writeBatchSize == -1L ? ((MemorySize)RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue()).getBytes() : this.writeBatchSize;
    }

    public void setWriteBatchSize(long writeBatchSize) {
        Preconditions.checkArgument((writeBatchSize >= 0L ? 1 : 0) != 0, (Object)"Write batch size have to be no negative.");
        this.writeBatchSize = writeBatchSize;
    }

    @VisibleForTesting
    RocksDBResourceContainer createOptionsAndResourceContainer() {
        return this.createOptionsAndResourceContainer(null);
    }

    @VisibleForTesting
    private RocksDBResourceContainer createOptionsAndResourceContainer(@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
        return new RocksDBResourceContainer(this.predefinedOptions != null ? this.predefinedOptions : PredefinedOptions.DEFAULT, this.rocksDbOptionsFactory, sharedResources);
    }

    public String toString() {
        return "EmbeddedRocksDBStateBackend{, localRocksDbDirectories=" + Arrays.toString(this.localRocksDbDirectories) + ", enableIncrementalCheckpointing=" + this.enableIncrementalCheckpointing + ", numberOfTransferThreads=" + this.numberOfTransferThreads + ", writeBatchSize=" + this.writeBatchSize + '}';
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
        Class<EmbeddedRocksDBStateBackend> clazz = EmbeddedRocksDBStateBackend.class;
        synchronized (EmbeddedRocksDBStateBackend.class) {
            if (!rocksDbInitialized) {
                File tempDirParent = new File(tempDirectory).getAbsoluteFile();
                LOG.info("Attempting to load RocksDB native library and store it under '{}'", (Object)tempDirParent);
                Throwable lastException = null;
                for (int attempt = 1; attempt <= 3; ++attempt) {
                    File rocksLibFolder = null;
                    try {
                        rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());
                        LOG.debug("Attempting to create RocksDB native library folder {}", (Object)rocksLibFolder);
                        rocksLibFolder.mkdirs();
                        NativeLibraryLoader.getInstance().loadLibrary(rocksLibFolder.getAbsolutePath());
                        RocksDB.loadLibrary();
                        LOG.info("Successfully loaded RocksDB native library");
                        rocksDbInitialized = true;
                        // ** MonitorExit[var1_1] (shouldn't be in output)
                        return;
                    }
                    catch (Throwable t) {
                        lastException = t;
                        LOG.debug("RocksDB JNI library loading attempt {} failed", (Object)attempt, (Object)t);
                        try {
                            EmbeddedRocksDBStateBackend.resetRocksDBLoadedFlag();
                        }
                        catch (Throwable tt) {
                            LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt);
                        }
                        FileUtils.deleteDirectoryQuietly((File)rocksLibFolder);
                        continue;
                    }
                }
                throw new IOException("Could not load the native RocksDB library", lastException);
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }

    @VisibleForTesting
    static void resetRocksDBLoadedFlag() throws Exception {
        Field initField = NativeLibraryLoader.class.getDeclaredField("initialized");
        initField.setAccessible(true);
        initField.setBoolean(null, false);
    }

    public static enum PriorityQueueStateType implements DescribedEnum
    {
        HEAP((InlineElement)TextElement.text((String)"Heap-based")),
        ROCKSDB((InlineElement)TextElement.text((String)"Implementation based on RocksDB"));

        private final InlineElement description;

        private PriorityQueueStateType(InlineElement description) {
            this.description = description;
        }

        public InlineElement getDescription() {
            return this.description;
        }
    }
}

