package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableFunction;

/* loaded from: input_file:org/apache/paimon/flink/sink/FlinkSink.class */
public abstract class FlinkSink<T> implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    protected final FileStoreTable table;
    private final boolean isOverwrite;

    public FlinkSink(FileStoreTable fileStoreTable, boolean z) {
        this.table = fileStoreTable;
        this.isOverwrite = z;
    }

    private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig checkpointConfig) {
        boolean z;
        if (this.table.coreOptions().writeOnly()) {
            z = false;
        } else {
            Options configuration = this.table.coreOptions().toConfiguration();
            CoreOptions.ChangelogProducer changelogProducer = this.table.coreOptions().changelogProducer();
            z = changelogProducer == CoreOptions.ChangelogProducer.LOOKUP && ((Boolean) configuration.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)).booleanValue();
            int i = -1;
            if (configuration.contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                i = ((Integer) configuration.get(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)).intValue();
            } else if (configuration.contains(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
                i = (int) (((Duration) configuration.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)).toMillis() / checkpointConfig.getCheckpointInterval());
            }
            if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || i >= 0) {
                int max = Math.max(i, 1);
                return (fileStoreTable, str, storeSinkWriteState, iOManager) -> {
                    return new GlobalFullCompactionSinkWrite(fileStoreTable, str, storeSinkWriteState, iOManager, this.isOverwrite, z, max);
                };
            }
        }
        boolean z2 = z;
        return (fileStoreTable2, str2, storeSinkWriteState2, iOManager2) -> {
            return new StoreSinkWriteImpl(fileStoreTable2, str2, storeSinkWriteState2, iOManager2, this.isOverwrite, z2);
        };
    }

    public DataStreamSink<?> sinkFrom(DataStream<T> dataStream) {
        return sinkFrom(dataStream, UUID.randomUUID().toString(), createWriteProvider(dataStream.getExecutionEnvironment().getCheckpointConfig()));
    }

    public DataStreamSink<?> sinkFrom(DataStream<T> dataStream, String str, StoreSinkWrite.Provider provider) {
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        ReadableConfig configuration = StreamExecutionEnvironmentUtils.getConfiguration(executionEnvironment);
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        boolean z = configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        boolean z2 = z && checkpointConfig.isCheckpointingEnabled();
        if (z2) {
            assertCheckpointConfiguration(executionEnvironment);
        }
        CommittableTypeInfo committableTypeInfo = new CommittableTypeInfo();
        return dataStream.transform("Writer -> " + this.table.name(), committableTypeInfo, createWriteOperator(provider, z, str)).setParallelism(dataStream.getParallelism()).transform("Global Committer -> " + this.table.name(), committableTypeInfo, new CommitterOperator(z2, str, createCommitterFactory(z2), createCommittableStateManager())).setParallelism(1).setMaxParallelism(1).addSink(new DiscardingSink()).name("end").setParallelism(1);
    }

    private void assertCheckpointConfiguration(StreamExecutionEnvironment streamExecutionEnvironment) {
        Preconditions.checkArgument(!streamExecutionEnvironment.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "Paimon sink currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(streamExecutionEnvironment.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }

    protected abstract OneInputStreamOperator<T, Committable> createWriteOperator(StoreSinkWrite.Provider provider, boolean z, String str);

    protected abstract SerializableFunction<String, Committer> createCommitterFactory(boolean z);

    protected abstract CommittableStateManager createCommittableStateManager();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -762036701:
                if (implMethodName.equals("lambda$createWriteProvider$1d9cfde3$1")) {
                    z = true;
                    break;
                }
                break;
            case -310984110:
                if (implMethodName.equals("lambda$createWriteProvider$ee07332c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("(ZILorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    FlinkSink flinkSink = (FlinkSink) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    int intValue = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    return (fileStoreTable, str, storeSinkWriteState, iOManager) -> {
                        return new GlobalFullCompactionSinkWrite(fileStoreTable, str, storeSinkWriteState, iOManager, this.isOverwrite, booleanValue, intValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("(ZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    FlinkSink flinkSink2 = (FlinkSink) serializedLambda.getCapturedArg(0);
                    boolean booleanValue2 = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    return (fileStoreTable2, str2, storeSinkWriteState2, iOManager2) -> {
                        return new StoreSinkWriteImpl(fileStoreTable2, str2, storeSinkWriteState2, iOManager2, this.isOverwrite, booleanValue2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
