package org.apache.doris.flink.datastream;

import java.util.Iterator;
import java.util.List;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/datastream/DorisSourceFunction.class */
public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> {
    private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
    private final DorisDeserializationSchema<List<?>> deserializer;
    private final DorisOptions options;
    private final DorisReadOptions readOptions;
    private volatile transient boolean isRunning;
    private List<PartitionDefinition> dorisPartitions;
    private List<PartitionDefinition> taskDorisPartitions = Lists.newArrayList();

    public DorisSourceFunction(DorisStreamOptions dorisStreamOptions, DorisDeserializationSchema<List<?>> dorisDeserializationSchema) {
        this.deserializer = dorisDeserializationSchema;
        this.options = dorisStreamOptions.getOptions();
        this.readOptions = dorisStreamOptions.getReadOptions();
        try {
            this.dorisPartitions = RestService.findPartitions(this.options, this.readOptions, logger);
            logger.info("Doris partitions size {}", Integer.valueOf(this.dorisPartitions.size()));
        } catch (DorisException e) {
            throw new RuntimeException("Failed fetch doris partitions");
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.isRunning = true;
        assignTaskPartitions();
    }

    private void assignTaskPartitions() {
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        for (int i = 0; i < this.dorisPartitions.size(); i++) {
            if (i % numberOfParallelSubtasks == indexOfThisSubtask) {
                this.taskDorisPartitions.add(this.dorisPartitions.get(i));
            }
        }
        logger.info("subtask {} process {} partitions ", Integer.valueOf(indexOfThisSubtask), Integer.valueOf(this.taskDorisPartitions.size()));
    }

    public void run(SourceFunction.SourceContext<List<?>> sourceContext) {
        Iterator<PartitionDefinition> it = this.taskDorisPartitions.iterator();
        while (it.hasNext()) {
            ScalaValueReader scalaValueReader = new ScalaValueReader(it.next(), this.options, this.readOptions);
            Throwable th = null;
            while (this.isRunning && scalaValueReader.hasNext()) {
                try {
                    try {
                        sourceContext.collect(scalaValueReader.next());
                    } catch (Throwable th2) {
                        if (scalaValueReader != null) {
                            if (th != null) {
                                try {
                                    scalaValueReader.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                scalaValueReader.close();
                            }
                        }
                        throw th2;
                    }
                } finally {
                }
            }
            if (scalaValueReader != null) {
                if (0 != 0) {
                    try {
                        scalaValueReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scalaValueReader.close();
                }
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void close() throws Exception {
        super.close();
        this.isRunning = false;
    }

    public TypeInformation<List<?>> getProducedType() {
        return this.deserializer.getProducedType();
    }
}
