/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.metadata;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.metadata.ChannelStateHandleSerializer;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.function.BiConsumerWithException;

@Internal
public class MetadataV3Serializer
extends MetadataV2V3SerializerBase
implements MetadataSerializer {
    public static final int VERSION = 3;
    public static final MetadataV3Serializer INSTANCE = new MetadataV3Serializer();
    private final ChannelStateHandleSerializer channelStateHandleSerializer = new ChannelStateHandleSerializer();

    private MetadataV3Serializer() {
    }

    public int getVersion() {
        return 3;
    }

    public static void serialize(CheckpointMetadata checkpointMetadata, DataOutputStream dos) throws IOException {
        INSTANCE.serializeMetadata(checkpointMetadata, dos);
    }

    @Override
    public CheckpointMetadata deserialize(DataInputStream dis, ClassLoader classLoader, String externalPointer) throws IOException {
        return this.deserializeMetadata(dis, externalPointer);
    }

    @Override
    protected void serializeOperatorState(OperatorState operatorState, DataOutputStream dos) throws IOException {
        dos.writeLong(operatorState.getOperatorID().getLowerPart());
        dos.writeLong(operatorState.getOperatorID().getUpperPart());
        dos.writeInt(operatorState.getParallelism());
        dos.writeInt(operatorState.getMaxParallelism());
        MetadataV3Serializer.serializeStreamStateHandle(operatorState.getCoordinatorState(), dos);
        Map<Integer, OperatorSubtaskState> subtaskStateMap = operatorState.getSubtaskStates();
        dos.writeInt(subtaskStateMap.size());
        for (Map.Entry<Integer, OperatorSubtaskState> entry : subtaskStateMap.entrySet()) {
            dos.writeInt(entry.getKey());
            this.serializeSubtaskState(entry.getValue(), dos);
        }
    }

    @Override
    protected void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
        super.serializeSubtaskState(subtaskState, dos);
        this.serializeCollection(subtaskState.getInputChannelState(), dos, this::serializeInputChannelStateHandle);
        this.serializeCollection(subtaskState.getResultSubpartitionState(), dos, this::serializeResultSubpartitionStateHandle);
    }

    @Override
    protected OperatorState deserializeOperatorState(DataInputStream dis, @Nullable MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
        OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
        int parallelism = dis.readInt();
        int maxParallelism = dis.readInt();
        OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
        operatorState.setCoordinatorState(MetadataV3Serializer.deserializeAndCheckByteStreamStateHandle(dis, context));
        int numSubTaskStates = dis.readInt();
        for (int j = 0; j < numSubTaskStates; ++j) {
            int subtaskIndex = dis.readInt();
            OperatorSubtaskState subtaskState = this.deserializeSubtaskState(dis, context);
            operatorState.putState(subtaskIndex, subtaskState);
        }
        return operatorState;
    }

    @Override
    @VisibleForTesting
    public void serializeResultSubpartitionStateHandle(ResultSubpartitionStateHandle handle, DataOutputStream dos) throws IOException {
        this.channelStateHandleSerializer.serialize(handle, dos);
    }

    @Override
    @VisibleForTesting
    public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis, @Nullable MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
        return MetadataV3Serializer.deserializeCollection(dis, context, this.channelStateHandleSerializer::deserializeResultSubpartitionStateHandle);
    }

    @Override
    @VisibleForTesting
    public void serializeInputChannelStateHandle(InputChannelStateHandle handle, DataOutputStream dos) throws IOException {
        this.channelStateHandleSerializer.serialize(handle, dos);
    }

    @Override
    @VisibleForTesting
    public StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis, @Nullable MetadataV2V3SerializerBase.DeserializationContext context) throws IOException {
        return MetadataV3Serializer.deserializeCollection(dis, context, this.channelStateHandleSerializer::deserializeInputChannelStateHandle);
    }

    private <T extends StateObject> void serializeCollection(StateObjectCollection<T> stateObjectCollection, DataOutputStream dos, BiConsumerWithException<T, DataOutputStream, IOException> cons) throws IOException {
        if (stateObjectCollection == null) {
            dos.writeInt(0);
        } else {
            dos.writeInt(stateObjectCollection.size());
            for (StateObject stateObject : stateObjectCollection) {
                cons.accept((Object)stateObject, (Object)dos);
            }
        }
    }

    @VisibleForTesting
    public static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
        MetadataV2V3SerializerBase.serializeStreamStateHandle(stateHandle, dos);
    }

    @VisibleForTesting
    public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
        return MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, null);
    }

    @VisibleForTesting
    public static void serializeOperatorStateHandleUtil(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
        INSTANCE.serializeOperatorStateHandle(stateHandle, dos);
    }

    @VisibleForTesting
    public static OperatorStateHandle deserializeOperatorStateHandleUtil(DataInputStream dis) throws IOException {
        return INSTANCE.deserializeOperatorStateHandle(dis, null);
    }

    @VisibleForTesting
    public static void serializeKeyedStateHandleUtil(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
        INSTANCE.serializeKeyedStateHandle(stateHandle, dos);
    }

    @VisibleForTesting
    public static KeyedStateHandle deserializeKeyedStateHandleUtil(DataInputStream dis) throws IOException {
        return INSTANCE.deserializeKeyedStateHandle(dis, null);
    }

    @VisibleForTesting
    public static StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis) throws IOException {
        return INSTANCE.deserializeInputChannelStateHandle(dis, null);
    }

    @VisibleForTesting
    public StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis) throws IOException {
        return INSTANCE.deserializeResultSubpartitionStateHandle(dis, null);
    }
}

