package org.apache.doris.flink.source;

import java.util.ArrayList;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.source.assigners.SimpleSplitAssigner;
import org.apache.doris.flink.source.enumerator.DorisSourceEnumerator;
import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpointSerializer;
import org.apache.doris.flink.source.reader.DorisRecordEmitter;
import org.apache.doris.flink.source.reader.DorisSourceReader;
import org.apache.doris.flink.source.split.DorisSourceSplit;
import org.apache.doris.flink.source.split.DorisSourceSplitSerializer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/source/DorisSource.class */
public class DorisSource<OUT> implements Source<OUT, DorisSourceSplit, PendingSplitsCheckpoint>, ResultTypeQueryable<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(DorisSource.class);
    private final DorisOptions options;
    private final DorisReadOptions readOptions;
    private final Boundedness boundedness;
    private final DorisDeserializationSchema<OUT> deserializer;

    public DorisSource(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, Boundedness boundedness, DorisDeserializationSchema<OUT> dorisDeserializationSchema) {
        this.options = dorisOptions;
        this.readOptions = dorisReadOptions;
        this.boundedness = boundedness;
        this.deserializer = dorisDeserializationSchema;
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    public SourceReader<OUT, DorisSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new DorisSourceReader(this.options, this.readOptions, new DorisRecordEmitter(this.deserializer), sourceReaderContext, sourceReaderContext.getConfiguration());
    }

    public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<DorisSourceSplit> splitEnumeratorContext) throws Exception {
        ArrayList arrayList = new ArrayList();
        RestService.findPartitions(this.options, this.readOptions, LOG).forEach(partitionDefinition -> {
            arrayList.add(new DorisSourceSplit(partitionDefinition));
        });
        return new DorisSourceEnumerator(splitEnumeratorContext, new SimpleSplitAssigner(arrayList));
    }

    public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(SplitEnumeratorContext<DorisSourceSplit> splitEnumeratorContext, PendingSplitsCheckpoint pendingSplitsCheckpoint) throws Exception {
        return new DorisSourceEnumerator(splitEnumeratorContext, new SimpleSplitAssigner(pendingSplitsCheckpoint.getSplits()));
    }

    public SimpleVersionedSerializer<DorisSourceSplit> getSplitSerializer() {
        return DorisSourceSplitSerializer.INSTANCE;
    }

    public SimpleVersionedSerializer<PendingSplitsCheckpoint> getEnumeratorCheckpointSerializer() {
        return new PendingSplitsCheckpointSerializer(getSplitSerializer());
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<DorisSourceSplit>) splitEnumeratorContext, (PendingSplitsCheckpoint) obj);
    }
}
