package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;

/* loaded from: input_file:org/apache/paimon/flink/sink/RowDataStoreWriteOperator.class */
public class RowDataStoreWriteOperator extends PrepareCommitOperator<RowData> {
    private static final long serialVersionUID = 3;
    private final FileStoreTable table;

    @Nullable
    private final LogSinkFunction logSinkFunction;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    private transient StoreSinkWrite write;
    private transient SimpleContext sinkContext;

    @Nullable
    private transient LogWriteCallback logCallback;
    private long currentWatermark = Long.MIN_VALUE;

    /* loaded from: input_file:org/apache/paimon/flink/sink/RowDataStoreWriteOperator$SimpleContext.class */
    private class SimpleContext implements SinkFunction.Context {

        @Nullable
        private Long timestamp;
        private final ProcessingTimeService processingTimeService;

        public SimpleContext(ProcessingTimeService processingTimeService) {
            this.processingTimeService = processingTimeService;
        }

        public long currentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        public long currentWatermark() {
            return RowDataStoreWriteOperator.this.currentWatermark;
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }

    public RowDataStoreWriteOperator(FileStoreTable fileStoreTable, @Nullable LogSinkFunction logSinkFunction, StoreSinkWrite.Provider provider, String str) {
        this.table = fileStoreTable;
        this.logSinkFunction = logSinkFunction;
        this.storeSinkWriteProvider = provider;
        this.initialCommitUser = str;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Committable>> output) {
        super.setup(streamTask, streamConfig, output);
        if (this.logSinkFunction != null) {
            FunctionUtils.setFunctionRuntimeContext(this.logSinkFunction, getRuntimeContext());
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        String str = (String) StateUtils.getSingleValueFromState(stateInitializationContext, "commit_user_state", String.class, this.initialCommitUser);
        RowDataChannelComputer rowDataChannelComputer = new RowDataChannelComputer(this.table.schema(), this.logSinkFunction != null);
        rowDataChannelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
        this.state = new StoreSinkWriteState(stateInitializationContext, (str2, binaryRow, i) -> {
            return rowDataChannelComputer.channel(binaryRow, i) == getRuntimeContext().getIndexOfThisSubtask();
        });
        this.write = this.storeSinkWriteProvider.provide(this.table, str, this.state, getContainingTask().getEnvironment().getIOManager());
        if (this.logSinkFunction != null) {
            StreamingFunctionUtils.restoreFunctionState(stateInitializationContext, this.logSinkFunction);
        }
    }

    public void open() throws Exception {
        super.open();
        this.sinkContext = new SimpleContext(getProcessingTimeService());
        if (this.logSinkFunction != null) {
            FunctionUtils.openFunction(this.logSinkFunction, new Configuration());
            this.logCallback = new LogWriteCallback();
            this.logSinkFunction.setWriteCallback(this.logCallback);
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
        if (this.logSinkFunction != null) {
            this.logSinkFunction.writeWatermark(new org.apache.flink.api.common.eventtime.Watermark(watermark.getTimestamp()));
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        this.sinkContext.timestamp = streamRecord.hasTimestamp() ? Long.valueOf(streamRecord.getTimestamp()) : null;
        try {
            SinkRecord write = this.write.write(new FlinkRowWrapper((RowData) streamRecord.getValue()));
            if (this.logSinkFunction != null) {
                this.logSinkFunction.invoke(this.write.toLogRecord(write), this.sinkContext);
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.write.snapshotState();
        this.state.snapshotState();
        if (this.logSinkFunction != null) {
            StreamingFunctionUtils.snapshotFunctionState(stateSnapshotContext, getOperatorStateBackend(), this.logSinkFunction);
        }
    }

    public void finish() throws Exception {
        super.finish();
        if (this.logSinkFunction != null) {
            this.logSinkFunction.finish();
        }
    }

    public void close() throws Exception {
        super.close();
        this.write.close();
        if (this.logSinkFunction != null) {
            FunctionUtils.closeFunction(this.logSinkFunction);
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        if (this.logSinkFunction instanceof CheckpointListener) {
            this.logSinkFunction.notifyCheckpointComplete(j);
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        super.notifyCheckpointAborted(j);
        if (this.logSinkFunction instanceof CheckpointListener) {
            this.logSinkFunction.notifyCheckpointAborted(j);
        }
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    protected List<Committable> prepareCommit(boolean z, long j) throws IOException {
        List<Committable> prepareCommit = this.write.prepareCommit(z, j);
        if (this.logCallback != null) {
            try {
                this.logSinkFunction.flush();
                this.logCallback.offsets().forEach((num, l) -> {
                    prepareCommit.add(new Committable(j, Committable.Kind.LOG_OFFSET, new LogOffsetCommittable(num.intValue(), l.longValue())));
                });
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
        return prepareCommit;
    }
}
