/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.ttl;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlUtils;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.FlinkRuntimeException;

abstract class TtlStateSnapshotTransformer<T>
implements StateSnapshotTransformer.CollectionStateSnapshotTransformer<T> {
    private final TtlTimeProvider ttlTimeProvider;
    final long ttl;
    private final DataInputDeserializer div;

    TtlStateSnapshotTransformer(@Nonnull TtlTimeProvider ttlTimeProvider, long ttl) {
        this.ttlTimeProvider = ttlTimeProvider;
        this.ttl = ttl;
        this.div = new DataInputDeserializer();
    }

    <V> TtlValue<V> filterTtlValue(TtlValue<V> value) {
        return this.expired(value) ? null : value;
    }

    private boolean expired(TtlValue<?> ttlValue) {
        return this.expired(ttlValue.getLastAccessTimestamp());
    }

    boolean expired(long ts) {
        return TtlUtils.expired(ts, this.ttl, this.ttlTimeProvider);
    }

    long deserializeTs(byte[] value) throws IOException {
        this.div.setBuffer(value, 0, 8);
        return LongSerializer.INSTANCE.deserialize((DataInputView)this.div);
    }

    @Override
    public StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy getFilterStrategy() {
        return StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED;
    }

    static class Factory<T>
    implements StateSnapshotTransformer.StateSnapshotTransformFactory<TtlValue<T>> {
        private final TtlTimeProvider ttlTimeProvider;
        private final long ttl;

        Factory(@Nonnull TtlTimeProvider ttlTimeProvider, long ttl) {
            this.ttlTimeProvider = ttlTimeProvider;
            this.ttl = ttl;
        }

        @Override
        public Optional<StateSnapshotTransformer<TtlValue<T>>> createForDeserializedState() {
            return Optional.of(new TtlDeserializedValueStateSnapshotTransformer(this.ttlTimeProvider, this.ttl));
        }

        @Override
        public Optional<StateSnapshotTransformer<byte[]>> createForSerializedState() {
            return Optional.of(new TtlSerializedValueStateSnapshotTransformer(this.ttlTimeProvider, this.ttl));
        }
    }

    static class TtlSerializedValueStateSnapshotTransformer
    extends TtlStateSnapshotTransformer<byte[]> {
        TtlSerializedValueStateSnapshotTransformer(TtlTimeProvider ttlTimeProvider, long ttl) {
            super(ttlTimeProvider, ttl);
        }

        @Override
        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] value) {
            long ts;
            if (value == null) {
                return null;
            }
            try {
                ts = this.deserializeTs(value);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Unexpected timestamp deserialization failure");
            }
            return this.expired(ts) ? null : value;
        }
    }

    static class TtlDeserializedValueStateSnapshotTransformer<T>
    extends TtlStateSnapshotTransformer<TtlValue<T>> {
        TtlDeserializedValueStateSnapshotTransformer(TtlTimeProvider ttlTimeProvider, long ttl) {
            super(ttlTimeProvider, ttl);
        }

        @Override
        @Nullable
        public TtlValue<T> filterOrTransform(@Nullable TtlValue<T> value) {
            return this.filterTtlValue(value);
        }
    }
}

