package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.class */
public class UpdatedDataFieldsProcessFunction extends ProcessFunction<List<DataField>, Void> {
    private final SchemaManager schemaManager;
    private static final Logger LOG = LoggerFactory.getLogger(UpdatedDataFieldsProcessFunction.class);
    private static final List<DataTypeRoot> STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
    private static final List<DataTypeRoot> BINARY_TYPES = Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
    private static final List<DataTypeRoot> INTEGER_TYPES = Arrays.asList(DataTypeRoot.TINYINT, DataTypeRoot.SMALLINT, DataTypeRoot.INTEGER, DataTypeRoot.BIGINT);
    private static final List<DataTypeRoot> FLOATING_POINT_TYPES = Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);

    /* loaded from: input_file:org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction$ConvertAction.class */
    public enum ConvertAction {
        CONVERT,
        IGNORE,
        EXCEPTION
    }

    public UpdatedDataFieldsProcessFunction(SchemaManager schemaManager) {
        this.schemaManager = schemaManager;
    }

    public void processElement(List<DataField> list, ProcessFunction<List<DataField>, Void>.Context context, Collector<Void> collector) throws Exception {
        Iterator<SchemaChange> it = extractSchemaChanges(list).iterator();
        while (it.hasNext()) {
            applySchemaChange(it.next());
        }
    }

    private List<SchemaChange> extractSchemaChanges(List<DataField> list) {
        RowType logicalRowType = this.schemaManager.latest().get().logicalRowType();
        HashMap hashMap = new HashMap();
        for (DataField dataField : logicalRowType.getFields()) {
            hashMap.put(dataField.name(), dataField);
        }
        ArrayList arrayList = new ArrayList();
        for (DataField dataField2 : list) {
            if (hashMap.containsKey(dataField2.name())) {
                DataField dataField3 = (DataField) hashMap.get(dataField2.name());
                if (!dataField3.type().equalsIgnoreNullable(dataField2.type())) {
                    arrayList.add(SchemaChange.updateColumnType(dataField2.name(), dataField2.type()));
                    if (dataField2.description() != null) {
                        arrayList.add(SchemaChange.updateColumnComment(new String[]{dataField2.name()}, dataField2.description()));
                    }
                } else if (!Objects.equals(dataField3.description(), dataField2.description())) {
                    arrayList.add(SchemaChange.updateColumnComment(new String[]{dataField2.name()}, dataField2.description()));
                }
            } else {
                arrayList.add(SchemaChange.addColumn(dataField2.name(), dataField2.type(), dataField2.description(), null));
            }
        }
        return arrayList;
    }

    private void applySchemaChange(SchemaChange schemaChange) throws Exception {
        if (schemaChange instanceof SchemaChange.AddColumn) {
            try {
                this.schemaManager.commitChanges(schemaChange);
                return;
            } catch (IllegalArgumentException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to perform SchemaChange.AddColumn {}, possibly due to duplicated column name", schemaChange, e);
                    return;
                }
                return;
            }
        }
        if (!(schemaChange instanceof SchemaChange.UpdateColumnType)) {
            if (!(schemaChange instanceof SchemaChange.UpdateColumnComment)) {
                throw new UnsupportedOperationException("Unsupported schema change class " + schemaChange.getClass().getName() + ", content " + schemaChange);
            }
            this.schemaManager.commitChanges(schemaChange);
            return;
        }
        SchemaChange.UpdateColumnType updateColumnType = (SchemaChange.UpdateColumnType) schemaChange;
        TableSchema orElseThrow = this.schemaManager.latest().orElseThrow(() -> {
            return new RuntimeException("Table does not exist. This is unexpected.");
        });
        int indexOf = orElseThrow.fieldNames().indexOf(updateColumnType.fieldName());
        Preconditions.checkState(indexOf >= 0, "Field name " + updateColumnType.fieldName() + " does not exist in table. This is unexpected.");
        DataType type = orElseThrow.fields().get(indexOf).type();
        DataType newDataType = updateColumnType.newDataType();
        switch (canConvert(type, newDataType)) {
            case CONVERT:
                this.schemaManager.commitChanges(schemaChange);
                return;
            case EXCEPTION:
                throw new UnsupportedOperationException(String.format("Cannot convert field %s from type %s to %s", updateColumnType.fieldName(), type, newDataType));
            default:
                return;
        }
    }

    public static ConvertAction canConvert(DataType dataType, DataType dataType2) {
        if (dataType.equalsIgnoreNullable(dataType2)) {
            return ConvertAction.CONVERT;
        }
        int indexOf = STRING_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf2 = STRING_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf >= 0 && indexOf2 >= 0) {
            return DataTypeChecks.getLength(dataType) <= DataTypeChecks.getLength(dataType2) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf3 = BINARY_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf4 = BINARY_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf3 >= 0 && indexOf4 >= 0) {
            return DataTypeChecks.getLength(dataType) <= DataTypeChecks.getLength(dataType2) ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf5 = INTEGER_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf6 = INTEGER_TYPES.indexOf(dataType2.getTypeRoot());
        if (indexOf5 >= 0 && indexOf6 >= 0) {
            return indexOf5 <= indexOf6 ? ConvertAction.CONVERT : ConvertAction.IGNORE;
        }
        int indexOf7 = FLOATING_POINT_TYPES.indexOf(dataType.getTypeRoot());
        int indexOf8 = FLOATING_POINT_TYPES.indexOf(dataType2.getTypeRoot());
        return (indexOf7 < 0 || indexOf8 < 0) ? ConvertAction.EXCEPTION : indexOf7 <= indexOf8 ? ConvertAction.CONVERT : ConvertAction.IGNORE;
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((List<DataField>) obj, (ProcessFunction<List<DataField>, Void>.Context) context, (Collector<Void>) collector);
    }
}
