/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.flink.contrib.streaming.state.RocksDBStateDataTransfer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.CheckedSupplier;

public class RocksDBStateUploader
extends RocksDBStateDataTransfer {
    private static final int READ_BUFFER_SIZE = 16384;

    public RocksDBStateUploader(int numberOfSnapshottingThreads) {
        super(numberOfSnapshottingThreads);
    }

    public Map<StateHandleID, StreamStateHandle> uploadFilesToCheckpointFs(@Nonnull Map<StateHandleID, Path> files, CheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) throws Exception {
        HashMap<StateHandleID, StreamStateHandle> handles = new HashMap<StateHandleID, StreamStateHandle>();
        Map<StateHandleID, CompletableFuture<StreamStateHandle>> futures = this.createUploadFutures(files, checkpointStreamFactory, closeableRegistry);
        try {
            FutureUtils.waitForAll(futures.values()).get();
            for (Map.Entry<StateHandleID, CompletableFuture<StreamStateHandle>> entry : futures.entrySet()) {
                handles.put(entry.getKey(), entry.getValue().get());
            }
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException("Failed to download data for state handles.", (Throwable)e);
        }
        return handles;
    }

    private Map<StateHandleID, CompletableFuture<StreamStateHandle>> createUploadFutures(Map<StateHandleID, Path> files, CheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) {
        HashMap<StateHandleID, CompletableFuture<StreamStateHandle>> futures = new HashMap<StateHandleID, CompletableFuture<StreamStateHandle>>(files.size());
        for (Map.Entry<StateHandleID, Path> entry : files.entrySet()) {
            Supplier supplier = CheckedSupplier.unchecked(() -> this.uploadLocalFileToCheckpointFs((Path)entry.getValue(), checkpointStreamFactory, closeableRegistry));
            futures.put(entry.getKey(), CompletableFuture.supplyAsync(supplier, this.executorService));
        }
        return futures;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private StreamStateHandle uploadLocalFileToCheckpointFs(Path filePath, CheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) throws IOException {
        StreamStateHandle streamStateHandle;
        FSDataInputStream inputStream = null;
        CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
        try {
            int numBytes;
            byte[] buffer = new byte[16384];
            FileSystem backupFileSystem = filePath.getFileSystem();
            inputStream = backupFileSystem.open(filePath);
            closeableRegistry.registerCloseable((Closeable)inputStream);
            outputStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
            closeableRegistry.registerCloseable((Closeable)outputStream);
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
            StreamStateHandle result = null;
            if (closeableRegistry.unregisterCloseable((Closeable)outputStream)) {
                result = outputStream.closeAndGetHandle();
                outputStream = null;
            }
            streamStateHandle = result;
        }
        catch (Throwable throwable) {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(outputStream)) {
                IOUtils.closeQuietly(outputStream);
            }
            throw throwable;
        }
        if (closeableRegistry.unregisterCloseable((Closeable)inputStream)) {
            IOUtils.closeQuietly((AutoCloseable)inputStream);
        }
        if (closeableRegistry.unregisterCloseable((Closeable)outputStream)) {
            IOUtils.closeQuietly((AutoCloseable)outputStream);
        }
        return streamStateHandle;
    }
}

