/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.changelog.PeriodicMaterializationManager;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ChangelogStateBackend
implements DelegatingStateBackend,
ConfigurableStateBackend {
    private static final long serialVersionUID = 1000L;
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogStateBackend.class);
    private final StateBackend delegatedStateBackend;

    ChangelogStateBackend(StateBackend stateBackend) {
        this.delegatedStateBackend = (StateBackend)Preconditions.checkNotNull((Object)stateBackend);
        Preconditions.checkArgument((!(stateBackend instanceof DelegatingStateBackend) ? 1 : 0) != 0, (Object)"Recursive Delegation is not supported.");
        LOG.info("ChangelogStateBackend is used, delegating {}.", (Object)this.delegatedStateBackend.getClass().getSimpleName());
    }

    public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
        return this.restore(env, operatorIdentifier, keyGroupRange, ttlTimeProvider, stateHandles, baseHandles -> (AbstractKeyedStateBackend)this.delegatedStateBackend.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, metricGroup, baseHandles, cancelStreamRegistry));
    }

    public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry, double managedMemoryFraction) throws Exception {
        return this.restore(env, operatorIdentifier, keyGroupRange, ttlTimeProvider, stateHandles, baseHandles -> (AbstractKeyedStateBackend)this.delegatedStateBackend.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, metricGroup, baseHandles, cancelStreamRegistry, managedMemoryFraction));
    }

    public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
        return this.delegatedStateBackend.createOperatorStateBackend(env, operatorIdentifier, stateHandles, cancelStreamRegistry);
    }

    public boolean useManagedMemory() {
        return this.delegatedStateBackend.useManagedMemory();
    }

    public StateBackend getDelegatedStateBackend() {
        return this.delegatedStateBackend;
    }

    public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
        if (this.delegatedStateBackend instanceof ConfigurableStateBackend) {
            return new ChangelogStateBackend(((ConfigurableStateBackend)this.delegatedStateBackend).configure(config, classLoader));
        }
        return this;
    }

    private <K> ChangelogKeyedStateBackend<K> restore(Environment env, String operatorIdentifier, KeyGroupRange keyGroupRange, TtlTimeProvider ttlTimeProvider, Collection<KeyedStateHandle> stateHandles, ChangelogBackendRestoreOperation.BaseBackendBuilder<K> baseBackendBuilder) throws Exception {
        StateChangelogStorage changelogStorage = (StateChangelogStorage)Preconditions.checkNotNull((Object)env.getTaskStateManager().getStateChangelogStorage(), (String)"Changelog storage is null when creating and restoring the ChangelogKeyedStateBackend.");
        String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
        ExecutionConfig executionConfig = env.getExecutionConfig();
        Collection<ChangelogStateBackendHandle> stateBackendHandles = this.castHandles(stateHandles);
        ChangelogKeyedStateBackend<K> keyedStateBackend = ChangelogBackendRestoreOperation.restore(changelogStorage.createReader(), env.getUserCodeClassLoader().asClassLoader(), stateBackendHandles, baseBackendBuilder, (baseBackend, baseState) -> new ChangelogKeyedStateBackend(baseBackend, subtaskName, executionConfig, ttlTimeProvider, (StateChangelogWriter<? extends ChangelogStateHandle>)((StateChangelogWriter<ChangelogStateHandle>)((StateChangelogWriter<? extends ChangelogStateHandle>)changelogStorage.createWriter(operatorIdentifier, keyGroupRange, env.getMainMailboxExecutor()))), (Collection<ChangelogStateBackendHandle>)baseState, (CheckpointStorageWorkerView)env.getCheckpointStorageAccess()));
        PeriodicMaterializationManager periodicMaterializationManager = new PeriodicMaterializationManager((MailboxExecutor)Preconditions.checkNotNull((Object)env.getMainMailboxExecutor()), (ExecutorService)Preconditions.checkNotNull((Object)env.getAsyncOperationsThreadPool()), subtaskName, (message, exception) -> env.failExternally((Throwable)new AsynchronousException(message, exception)), keyedStateBackend, executionConfig.getPeriodicMaterializeIntervalMillis(), executionConfig.getMaterializationMaxAllowedFailures(), operatorIdentifier);
        keyedStateBackend.registerCloseable(periodicMaterializationManager);
        periodicMaterializationManager.start();
        return keyedStateBackend;
    }

    private Collection<ChangelogStateBackendHandle> castHandles(Collection<KeyedStateHandle> stateHandles) {
        if (stateHandles.stream().anyMatch(h -> !(h instanceof ChangelogStateBackendHandle))) {
            LOG.warn("Some state handles do not contain changelog: {} (ok if recovery from a savepoint)", stateHandles);
        }
        return stateHandles.stream().filter(Objects::nonNull).map(this::getChangelogStateBackendHandle).collect(Collectors.toList());
    }

    private ChangelogStateBackendHandle getChangelogStateBackendHandle(KeyedStateHandle keyedStateHandle) {
        if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
            return (ChangelogStateBackendHandle)keyedStateHandle;
        }
        if (keyedStateHandle instanceof SavepointKeyedStateHandle) {
            return new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(keyedStateHandle), Collections.emptyList(), keyedStateHandle.getKeyGroupRange(), this.getMaterializationID(keyedStateHandle), 0L);
        }
        throw new IllegalStateException(String.format("Recovery not supported from %s with Changelog enabled. Consider taking a savepoint in %s format.", keyedStateHandle.getClass(), SavepointFormatType.CANONICAL));
    }

    private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
        if (keyedStateHandle instanceof CheckpointBoundKeyedStateHandle) {
            return ((CheckpointBoundKeyedStateHandle)keyedStateHandle).getCheckpointId();
        }
        return 0L;
    }
}

