package org.apache.doris.flink.sink.writer;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.shaded.org.apache.arrow.vector.util.DateUtility;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/writer/DorisWriter.class */
public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWriterState> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DorisWriter.class);
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
    private final long lastCheckpointId;
    private DorisStreamLoad dorisStreamLoad;
    volatile boolean loading;
    private final DorisOptions dorisOptions;
    private final DorisReadOptions dorisReadOptions;
    private final DorisExecutionOptions executionOptions;
    private final String labelPrefix;
    private final LabelGenerator labelGenerator;
    private final int intervalTime;
    private final DorisWriterState dorisWriterState;
    private final DorisRecordSerializer<IN> serializer;
    private final transient ScheduledExecutorService scheduledExecutorService;
    private transient Thread executorThread;
    private volatile transient Exception loadException = null;
    private List<BackendV2.BackendRowV2> backends;
    private long pos;
    private String currentLabel;

    public DorisWriter(Sink.InitContext initContext, List<DorisWriterState> list, DorisRecordSerializer<IN> dorisRecordSerializer, DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions) {
        this.lastCheckpointId = initContext.getRestoredCheckpointId().orElse(0L);
        LOG.info("restore checkpointId {}", Long.valueOf(this.lastCheckpointId));
        LOG.info("labelPrefix " + dorisExecutionOptions.getLabelPrefix());
        this.dorisWriterState = new DorisWriterState(dorisExecutionOptions.getLabelPrefix());
        this.labelPrefix = dorisExecutionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
        this.labelGenerator = new LabelGenerator(this.labelPrefix, dorisExecutionOptions.enabled2PC().booleanValue());
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ExecutorThreadFactory("stream-load-check"));
        this.serializer = dorisRecordSerializer;
        this.dorisOptions = dorisOptions;
        this.dorisReadOptions = dorisReadOptions;
        this.executionOptions = dorisExecutionOptions;
        this.intervalTime = dorisExecutionOptions.checkInterval().intValue();
        this.loading = false;
        this.pos = 0L;
    }

    public void initializeLoad(List<DorisWriterState> list) throws IOException {
        this.backends = RestService.getBackendsV2(this.dorisOptions, this.dorisReadOptions, LOG);
        try {
            this.dorisStreamLoad = new DorisStreamLoad(getAvailableBackend(), this.dorisOptions, this.executionOptions, this.labelGenerator, new HttpUtil().getHttpClient());
            if (this.executionOptions.enabled2PC().booleanValue()) {
                this.dorisStreamLoad.abortPreCommit(this.labelPrefix, this.lastCheckpointId + 1);
            }
            this.executorThread = Thread.currentThread();
            this.currentLabel = this.labelGenerator.generateLabel(this.lastCheckpointId + 1);
            this.scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200L, this.intervalTime, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new DorisRuntimeException(e);
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        checkLoadException();
        byte[] serialize = this.serializer.serialize(in);
        if (Objects.isNull(serialize)) {
            return;
        }
        if (!this.loading) {
            this.dorisStreamLoad.startLoad(this.currentLabel);
            this.loading = true;
        }
        this.dorisStreamLoad.writeRecord(serialize);
    }

    public List<DorisCommittable> prepareCommit(boolean z) throws IOException {
        if (!this.loading) {
            return Collections.emptyList();
        }
        this.loading = false;
        Preconditions.checkState(this.dorisStreamLoad != null);
        RespContent stopLoad = this.dorisStreamLoad.stopLoad(this.currentLabel);
        if (DORIS_SUCCESS_STATUS.contains(stopLoad.getStatus())) {
            return !this.executionOptions.enabled2PC().booleanValue() ? Collections.emptyList() : ImmutableList.of(new DorisCommittable(this.dorisStreamLoad.getHostPort(), this.dorisStreamLoad.getDb(), stopLoad.getTxnId()));
        }
        throw new DorisRuntimeException(String.format("stream load error: %s, see more in %s", stopLoad.getMessage(), stopLoad.getErrorURL()));
    }

    public List<DorisWriterState> snapshotState(long j) throws IOException {
        Preconditions.checkState(this.dorisStreamLoad != null);
        this.dorisStreamLoad.setHostPort(getAvailableBackend());
        this.currentLabel = this.labelGenerator.generateLabel(j + 1);
        return Collections.singletonList(this.dorisWriterState);
    }

    private void checkDone() {
        String message;
        LOG.debug("start timer checker, interval {} ms", Integer.valueOf(this.intervalTime));
        if (this.dorisStreamLoad.getPendingLoadFuture() == null || !this.dorisStreamLoad.getPendingLoadFuture().isDone()) {
            return;
        }
        if (!this.loading) {
            LOG.debug("not loading, skip timer checker");
            return;
        }
        if (this.dorisStreamLoad.getPendingLoadFuture() == null || !this.dorisStreamLoad.getPendingLoadFuture().isDone()) {
            return;
        }
        try {
            message = this.dorisStreamLoad.handlePreCommitResponse(this.dorisStreamLoad.getPendingLoadFuture().get()).getMessage();
        } catch (Exception e) {
            message = e.getMessage();
        }
        this.loadException = new StreamLoadException(message);
        LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", message);
        this.executorThread.interrupt();
    }

    private void checkLoadException() {
        if (this.loadException != null) {
            throw new RuntimeException("error while loading data.", this.loadException);
        }
    }

    @VisibleForTesting
    public boolean isLoading() {
        return this.loading;
    }

    @VisibleForTesting
    public void setDorisStreamLoad(DorisStreamLoad dorisStreamLoad) {
        this.dorisStreamLoad = dorisStreamLoad;
    }

    @VisibleForTesting
    public void setBackends(List<BackendV2.BackendRowV2> list) {
        this.backends = list;
    }

    public void close() throws Exception {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
        if (this.dorisStreamLoad != null) {
            this.dorisStreamLoad.close();
        }
    }

    @VisibleForTesting
    public String getAvailableBackend() {
        long size = this.pos + this.backends.size();
        while (this.pos < size) {
            String backendString = this.backends.get((int) (this.pos % this.backends.size())).toBackendString();
            if (tryHttpConnection(backendString)) {
                this.pos++;
                return backendString;
            }
        }
        throw new DorisRuntimeException("no available backend.");
    }

    public boolean tryHttpConnection(String str) {
        try {
            str = "http://" + str;
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
            httpURLConnection.setConnectTimeout(DateUtility.minutesToMillis);
            httpURLConnection.connect();
            httpURLConnection.disconnect();
            return true;
        } catch (Exception e) {
            LOG.warn("Failed to connect to backend:{}", str, e);
            this.pos++;
            return false;
        }
    }
}
