/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.append;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppendWriteFunction<I>
extends AbstractStreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunction.class);
    private static final long serialVersionUID = 1L;
    private transient BulkInsertWriterHelper writerHelper;
    private final RowType rowType;

    public AppendWriteFunction(Configuration config, RowType rowType) {
        super(config);
        this.rowType = rowType;
    }

    @Override
    public void snapshotState() {
        this.flushData(false);
    }

    public void processElement(I value, ProcessFunction.Context ctx, Collector<Object> out) throws Exception {
        if (this.writerHelper == null) {
            this.initWriterHelper();
        }
        this.writerHelper.write((RowData)value);
    }

    @Override
    public void endInput() {
        this.flushData(true);
        this.writeStatuses.clear();
    }

    @VisibleForTesting
    public BulkInsertWriterHelper getWriterHelper() {
        return this.writerHelper;
    }

    private void initWriterHelper() {
        this.currentInstant = this.instantToWrite(true);
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), this.currentInstant, this.taskID, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getAttemptNumber(), this.rowType);
    }

    private void flushData(boolean endInput) {
        String instant;
        List<WriteStatus> writeStatus;
        if (this.writerHelper != null) {
            writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
            instant = this.writerHelper.getInstantTime();
        } else {
            writeStatus = Collections.emptyList();
            instant = this.instantToWrite(false);
            LOG.info("No data to write in subtask [{}] for instant [{}]", (Object)this.taskID, (Object)instant);
        }
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instant).writeStatus(writeStatus).lastBatch(true).endInput(endInput).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        this.writerHelper = null;
        this.writeStatuses.addAll(writeStatus);
        this.confirming = true;
    }
}

