/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.hooks;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.LambdaUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class MasterHooks {
    public static void reset(Collection<MasterTriggerRestoreHook<?>> hooks, Logger log) throws FlinkException {
        for (MasterTriggerRestoreHook<?> hook : hooks) {
            String id = hook.getIdentifier();
            try {
                hook.reset();
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                throw new FlinkException("Error while resetting checkpoint master hook '" + id + '\'', t);
            }
        }
    }

    public static void close(Collection<MasterTriggerRestoreHook<?>> hooks, Logger log) throws FlinkException {
        for (MasterTriggerRestoreHook<?> hook : hooks) {
            try {
                hook.close();
            }
            catch (Throwable t) {
                log.warn("Failed to cleanly close a checkpoint master hook (" + hook.getIdentifier() + ")", t);
            }
        }
    }

    public static <T> CompletableFuture<MasterState> triggerHook(MasterTriggerRestoreHook<T> hook, long checkpointId, long timestamp, Executor executor) {
        String id = hook.getIdentifier();
        SimpleVersionedSerializer serializer = hook.createCheckpointDataSerializer();
        try {
            CompletableFuture<T> resultFuture = hook.triggerCheckpoint(checkpointId, timestamp, executor);
            if (resultFuture == null) {
                return CompletableFuture.completedFuture(null);
            }
            return ((CompletableFuture)resultFuture.thenApply(result -> {
                if (result == null) {
                    return null;
                }
                if (serializer != null) {
                    try {
                        int version = serializer.getVersion();
                        byte[] bytes = serializer.serialize(result);
                        return new MasterState(id, bytes, version);
                    }
                    catch (Throwable t) {
                        ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                        throw new CompletionException(new FlinkException("Failed to serialize state of master hook '" + id + '\'', t));
                    }
                }
                throw new CompletionException(new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer"));
            })).exceptionally(throwable -> {
                throw new CompletionException(new FlinkException("Checkpoint master hook '" + id + "' produced an exception", throwable.getCause()));
            });
        }
        catch (Throwable t) {
            return FutureUtils.completedExceptionally(new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t));
        }
    }

    public static void restoreMasterHooks(Map<String, MasterTriggerRestoreHook<?>> masterHooks, Collection<MasterState> states, long checkpointId, boolean allowUnmatchedState, Logger log) throws FlinkException {
        if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty()) {
            log.info("No master state to restore");
            return;
        }
        log.info("Calling master restore hooks");
        LinkedHashMap allHooks = new LinkedHashMap(masterHooks);
        ArrayList<Tuple2> hooksAndStates = new ArrayList<Tuple2>();
        for (MasterState masterState : states) {
            if (masterState == null) continue;
            String name = masterState.name();
            MasterTriggerRestoreHook hook = (MasterTriggerRestoreHook)allHooks.remove(name);
            if (hook != null) {
                log.debug("Found state to restore for hook '{}'", (Object)name);
                Object deserializedState = MasterHooks.deserializeState(masterState, hook);
                hooksAndStates.add(new Tuple2((Object)hook, deserializedState));
                continue;
            }
            if (!allowUnmatchedState) {
                throw new IllegalStateException("Found state '" + masterState.name() + "' which is not resumed by any hook.");
            }
            log.info("Dropping unmatched state from '{}'", (Object)name);
        }
        for (Tuple2 tuple2 : hooksAndStates) {
            MasterHooks.restoreHook(tuple2.f1, (MasterTriggerRestoreHook)tuple2.f0, checkpointId);
        }
        for (MasterTriggerRestoreHook masterTriggerRestoreHook : allHooks.values()) {
            MasterHooks.restoreHook(null, masterTriggerRestoreHook, checkpointId);
        }
    }

    private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException {
        MasterTriggerRestoreHook<?> typedHook = hook;
        String id = hook.getIdentifier();
        try {
            SimpleVersionedSerializer<?> deserializer = typedHook.createCheckpointDataSerializer();
            if (deserializer == null) {
                throw new FlinkException("null serializer for state of hook " + hook.getIdentifier());
            }
            return (T)deserializer.deserialize(state.version(), state.bytes());
        }
        catch (Throwable t) {
            throw new FlinkException("Cannot deserialize state for master hook '" + id + '\'', t);
        }
    }

    private static <T> void restoreHook(Object state, MasterTriggerRestoreHook<?> hook, long checkpointId) throws FlinkException {
        Object typedState = state;
        MasterTriggerRestoreHook<?> typedHook = hook;
        try {
            typedHook.restoreCheckpoint(checkpointId, typedState);
        }
        catch (FlinkException e) {
            throw e;
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalError((Throwable)t);
            throw new FlinkException("Error while calling restoreCheckpoint on checkpoint hook '" + hook.getIdentifier() + '\'', t);
        }
    }

    public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
        return new WrappedMasterHook<T>(hook, userClassLoader);
    }

    private MasterHooks() {
    }

    private static class WrappedMasterHook<T>
    implements MasterTriggerRestoreHook<T> {
        private final MasterTriggerRestoreHook<T> hook;
        private final ClassLoader userClassLoader;

        WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
            this.hook = (MasterTriggerRestoreHook)Preconditions.checkNotNull(hook);
            this.userClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader);
        }

        @Override
        public void reset() throws Exception {
            LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, this.hook::reset);
        }

        @Override
        public void close() throws Exception {
            LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, this.hook::close);
        }

        @Override
        public String getIdentifier() {
            return (String)LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, this.hook::getIdentifier);
        }

        @Override
        @Nullable
        public CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
            Executor wrappedExecutor = new Executor(){

                @Override
                public void execute(Runnable command) {
                    executor.execute(new WrappedCommand(userClassLoader, command));
                }
            };
            return (CompletableFuture)LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, () -> this.hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor));
        }

        @Override
        public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception {
            LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, () -> this.hook.restoreCheckpoint(checkpointId, checkpointData));
        }

        @Override
        @Nullable
        public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
            return (SimpleVersionedSerializer)LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, this.hook::createCheckpointDataSerializer);
        }

        private static class WrappedCommand
        implements Runnable {
            private final ClassLoader userClassLoader;
            private final Runnable command;

            WrappedCommand(ClassLoader userClassLoader, Runnable command) {
                this.userClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader);
                this.command = (Runnable)Preconditions.checkNotNull((Object)command);
            }

            @Override
            public void run() {
                LambdaUtil.withContextClassLoader((ClassLoader)this.userClassLoader, this.command::run);
            }
        }
    }
}

