/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.base.source.meta.split;

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetDeserializerSerializer;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.utils.SerializerUtils;
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;

public abstract class SourceSplitSerializer
implements SimpleVersionedSerializer<SourceSplitBase>,
OffsetDeserializerSerializer {
    private static final int VERSION = 3;
    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
    private static final int SNAPSHOT_SPLIT_FLAG = 1;
    private static final int STREAM_SPLIT_FLAG = 2;

    public int getVersion() {
        return 3;
    }

    public byte[] serialize(SourceSplitBase split) throws IOException {
        if (split.isSnapshotSplit()) {
            SnapshotSplit snapshotSplit = split.asSnapshotSplit();
            if (snapshotSplit.serializedFormCache != null) {
                return snapshotSplit.serializedFormCache;
            }
            DataOutputSerializer out = SERIALIZER_CACHE.get();
            out.writeInt(1);
            out.writeUTF(snapshotSplit.getTableId().toString());
            out.writeUTF(snapshotSplit.splitId());
            out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString());
            Object[] splitStart = snapshotSplit.getSplitStart();
            Object[] splitEnd = snapshotSplit.getSplitEnd();
            out.writeUTF(SerializerUtils.rowToSerializedString(splitStart));
            out.writeUTF(SerializerUtils.rowToSerializedString(splitEnd));
            this.writeOffsetPosition(snapshotSplit.getHighWatermark(), out);
            SourceSplitSerializer.writeTableSchemas(snapshotSplit.getTableSchemas(), out);
            byte[] result = out.getCopyOfBuffer();
            out.clear();
            snapshotSplit.serializedFormCache = result;
            return result;
        }
        StreamSplit streamSplit = split.asStreamSplit();
        if (streamSplit.serializedFormCache != null) {
            return streamSplit.serializedFormCache;
        }
        DataOutputSerializer out = SERIALIZER_CACHE.get();
        out.writeInt(2);
        out.writeUTF(streamSplit.splitId());
        out.writeUTF("");
        this.writeOffsetPosition(streamSplit.getStartingOffset(), out);
        this.writeOffsetPosition(streamSplit.getEndingOffset(), out);
        this.writeFinishedSplitsInfo(streamSplit.getFinishedSnapshotSplitInfos(), out);
        SourceSplitSerializer.writeTableSchemas(streamSplit.getTableSchemas(), out);
        out.writeInt(streamSplit.getTotalFinishedSplitSize());
        byte[] result = out.getCopyOfBuffer();
        out.clear();
        streamSplit.serializedFormCache = result;
        return result;
    }

    public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException {
        switch (version) {
            case 1: 
            case 2: 
            case 3: {
                return this.deserializeSplit(version, serialized);
            }
        }
        throw new IOException("Unknown version: " + version);
    }

    public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        int splitKind = in.readInt();
        if (splitKind == 1) {
            TableId tableId = TableId.parse(in.readUTF());
            String splitId = in.readUTF();
            RowType splitKeyType = (RowType)LogicalTypeParser.parse((String)in.readUTF());
            Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF());
            Object[] splitBoundaryEnd = SerializerUtils.serializedStringToRow(in.readUTF());
            Offset highWatermark = this.readOffsetPosition(version, in);
            Map<TableId, TableChanges.TableChange> tableSchemas = SourceSplitSerializer.readTableSchemas(version, in);
            return new SnapshotSplit(tableId, splitId, splitKeyType, splitBoundaryStart, splitBoundaryEnd, highWatermark, tableSchemas);
        }
        if (splitKind == 2) {
            String splitId = in.readUTF();
            in.readUTF();
            Offset startingOffset = this.readOffsetPosition(version, in);
            Offset endingOffset = this.readOffsetPosition(version, in);
            List<FinishedSnapshotSplitInfo> finishedSplitsInfo = this.readFinishedSplitsInfo(version, in);
            Map<TableId, TableChanges.TableChange> tableChangeMap = SourceSplitSerializer.readTableSchemas(version, in);
            int totalFinishedSplitSize = finishedSplitsInfo.size();
            if (version == 3) {
                totalFinishedSplitSize = in.readInt();
            }
            in.releaseArrays();
            return new StreamSplit(splitId, startingOffset, endingOffset, finishedSplitsInfo, tableChangeMap, totalFinishedSplitSize);
        }
        throw new IOException("Unknown split kind: " + splitKind);
    }

    public static void writeTableSchemas(Map<TableId, TableChanges.TableChange> tableSchemas, DataOutputSerializer out) throws IOException {
        FlinkJsonTableChangeSerializer jsonSerializer = new FlinkJsonTableChangeSerializer();
        DocumentWriter documentWriter = DocumentWriter.defaultWriter();
        int size = tableSchemas.size();
        out.writeInt(size);
        for (Map.Entry<TableId, TableChanges.TableChange> entry : tableSchemas.entrySet()) {
            out.writeUTF(entry.getKey().toString());
            String tableChangeStr = documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
            byte[] tableChangeBytes = tableChangeStr.getBytes(StandardCharsets.UTF_8);
            out.writeInt(tableChangeBytes.length);
            out.write(tableChangeBytes);
        }
    }

    public static Map<TableId, TableChanges.TableChange> readTableSchemas(int version, DataInputDeserializer in) throws IOException {
        DocumentReader documentReader = DocumentReader.defaultReader();
        HashMap<TableId, TableChanges.TableChange> tableSchemas = new HashMap<TableId, TableChanges.TableChange>();
        int size = in.readInt();
        for (int i2 = 0; i2 < size; ++i2) {
            String tableChangeStr;
            TableId tableId = TableId.parse(in.readUTF());
            switch (version) {
                case 1: {
                    tableChangeStr = in.readUTF();
                    break;
                }
                case 2: 
                case 3: {
                    int len = in.readInt();
                    byte[] bytes = new byte[len];
                    in.read(bytes);
                    tableChangeStr = new String(bytes, StandardCharsets.UTF_8);
                    break;
                }
                default: {
                    throw new IOException("Unknown version: " + version);
                }
            }
            Document document = documentReader.read(tableChangeStr);
            TableChanges.TableChange tableChange = FlinkJsonTableChangeSerializer.fromDocument(document, true);
            tableSchemas.put(tableId, tableChange);
        }
        return tableSchemas;
    }

    private void writeFinishedSplitsInfo(List<FinishedSnapshotSplitInfo> finishedSplitsInfo, DataOutputSerializer out) throws IOException {
        int size = finishedSplitsInfo.size();
        out.writeInt(size);
        for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
            splitInfo.serialize(out);
        }
    }

    private List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(int version, DataInputDeserializer in) throws IOException {
        ArrayList<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<FinishedSnapshotSplitInfo>();
        int size = in.readInt();
        for (int i2 = 0; i2 < size; ++i2) {
            TableId tableId = TableId.parse(in.readUTF());
            String splitId = in.readUTF();
            Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF());
            Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF());
            OffsetFactory offsetFactory = (OffsetFactory)SerializerUtils.serializedStringToObject(in.readUTF());
            Offset highWatermark = this.readOffsetPosition(version, in);
            finishedSplitsInfo.add(new FinishedSnapshotSplitInfo(tableId, splitId, splitStart, splitEnd, highWatermark, offsetFactory));
        }
        return finishedSplitsInfo;
    }
}

