package org.apache.doris.flink.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.datastream.ScalaValueReader;
import org.apache.doris.flink.source.split.DorisSourceSplit;
import org.apache.doris.flink.source.split.DorisSplitRecords;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/source/reader/DorisSourceSplitReader.class */
public class DorisSourceSplitReader implements SplitReader<List, DorisSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(DorisSourceSplitReader.class);
    private final Queue<DorisSourceSplit> splits = new ArrayDeque();
    private final DorisOptions options;
    private final DorisReadOptions readOptions;
    private ScalaValueReader scalaValueReader;
    private String currentSplitId;

    public DorisSourceSplitReader(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions) {
        this.options = dorisOptions;
        this.readOptions = dorisReadOptions;
    }

    public RecordsWithSplitIds<List> fetch() throws IOException {
        checkSplitOrStartNext();
        return !this.scalaValueReader.hasNext() ? finishSplit() : DorisSplitRecords.forRecords(this.currentSplitId, this.scalaValueReader);
    }

    private void checkSplitOrStartNext() throws IOException {
        if (this.scalaValueReader != null) {
            return;
        }
        DorisSourceSplit poll = this.splits.poll();
        if (poll == null) {
            throw new IOException("Cannot fetch from another split - no split remaining");
        }
        this.currentSplitId = poll.splitId();
        this.scalaValueReader = new ScalaValueReader(poll.getPartitionDefinition(), this.options, this.readOptions);
    }

    private DorisSplitRecords finishSplit() {
        if (this.scalaValueReader != null) {
            this.scalaValueReader.close();
            this.scalaValueReader = null;
        }
        DorisSplitRecords finishedSplit = DorisSplitRecords.finishedSplit(this.currentSplitId);
        this.currentSplitId = null;
        return finishedSplit;
    }

    public void handleSplitsChanges(SplitsChange<DorisSourceSplit> splitsChange) {
        LOG.debug("Handling split change {}", splitsChange);
        this.splits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        if (this.scalaValueReader != null) {
            this.scalaValueReader.close();
        }
    }
}
