package org.apache.paimon.flink.log;

import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.format.Format;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.factories.FlinkFactoryUtil;
import org.apache.paimon.flink.factories.LogStoreFactoryUtil;

/* loaded from: input_file:org/apache/paimon/flink/log/LogStoreTableFactory.class */
public interface LogStoreTableFactory {
    String factoryIdentifier();

    LogSourceProvider createSourceProvider(DynamicTableFactory.Context context, DynamicTableSource.Context context2, @Nullable int[][] iArr);

    LogSinkProvider createSinkProvider(DynamicTableFactory.Context context, DynamicTableSink.Context context2);

    static ConfigOption<String> logKeyFormat() {
        return ConfigOptions.key(CoreOptions.LOG_KEY_FORMAT.key()).stringType().defaultValue(CoreOptions.LOG_KEY_FORMAT.defaultValue());
    }

    static ConfigOption<String> logFormat() {
        return ConfigOptions.key(CoreOptions.LOG_FORMAT.key()).stringType().defaultValue(CoreOptions.LOG_FORMAT.defaultValue());
    }

    static LogStoreTableFactory discoverLogStoreFactory(ClassLoader classLoader, String str) {
        return LogStoreFactoryUtil.discoverLogStoreFactory(classLoader, LogStoreTableFactory.class, str);
    }

    static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(FlinkFactoryUtil.FlinkTableFactoryHelper flinkTableFactoryHelper) {
        DecodingFormat<DeserializationSchema<RowData>> discoverDecodingFormat = flinkTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, logKeyFormat());
        validateKeyFormat(discoverDecodingFormat, (String) flinkTableFactoryHelper.getOptions().get(logKeyFormat()));
        return discoverDecodingFormat;
    }

    static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(FlinkFactoryUtil.FlinkTableFactoryHelper flinkTableFactoryHelper) {
        EncodingFormat<SerializationSchema<RowData>> discoverEncodingFormat = flinkTableFactoryHelper.discoverEncodingFormat(SerializationFormatFactory.class, logKeyFormat());
        validateKeyFormat(discoverEncodingFormat, (String) flinkTableFactoryHelper.getOptions().get(logKeyFormat()));
        return discoverEncodingFormat;
    }

    static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(FlinkFactoryUtil.FlinkTableFactoryHelper flinkTableFactoryHelper) {
        DecodingFormat<DeserializationSchema<RowData>> discoverDecodingFormat = flinkTableFactoryHelper.discoverDecodingFormat(DeserializationFormatFactory.class, logFormat());
        validateValueFormat(discoverDecodingFormat, (String) flinkTableFactoryHelper.getOptions().get(logFormat()));
        return discoverDecodingFormat;
    }

    static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(FlinkFactoryUtil.FlinkTableFactoryHelper flinkTableFactoryHelper) {
        EncodingFormat<SerializationSchema<RowData>> discoverEncodingFormat = flinkTableFactoryHelper.discoverEncodingFormat(SerializationFormatFactory.class, logFormat());
        validateValueFormat(discoverEncodingFormat, (String) flinkTableFactoryHelper.getOptions().get(logFormat()));
        return discoverEncodingFormat;
    }

    static void validateKeyFormat(Format format, String str) {
        if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
            throw new ValidationException(String.format("A key format should only deal with INSERT-only records. But %s has a changelog mode of %s.", str, format.getChangelogMode()));
        }
    }

    static void validateValueFormat(Format format, String str) {
        if (!format.getChangelogMode().equals(ChangelogMode.all())) {
            throw new ValidationException(String.format("A value format should deal with all records. But %s has a changelog mode of %s.", str, format.getChangelogMode()));
        }
    }
}
