package org.apache.doris.flink.sink.committer;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.doris.shaded.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/committer/DorisCommitter.class */
public class DorisCommitter implements Committer<DorisCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
    private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
    private final CloseableHttpClient httpClient;
    private final DorisOptions dorisOptions;
    private final DorisReadOptions dorisReadOptions;
    int maxRetry;

    public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int i) {
        this(dorisOptions, dorisReadOptions, i, new HttpUtil().getHttpClient());
    }

    public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int i, CloseableHttpClient closeableHttpClient) {
        this.dorisOptions = dorisOptions;
        this.dorisReadOptions = dorisReadOptions;
        this.maxRetry = i;
        this.httpClient = closeableHttpClient;
    }

    public List<DorisCommittable> commit(List<DorisCommittable> list) throws IOException, InterruptedException {
        Iterator<DorisCommittable> it = list.iterator();
        while (it.hasNext()) {
            commitTransaction(it.next());
        }
        return Collections.emptyList();
    }

    private void commitTransaction(DorisCommittable dorisCommittable) throws IOException {
        int i = -1;
        String str = null;
        int i2 = 0;
        String hostPort = dorisCommittable.getHostPort();
        CloseableHttpResponse closeableHttpResponse = null;
        while (true) {
            int i3 = i2;
            i2++;
            if (i3 > this.maxRetry) {
                break;
            }
            HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
            httpPutBuilder.setUrl(String.format(commitPattern, hostPort, dorisCommittable.getDb())).baseAuth(this.dorisOptions.getUsername(), this.dorisOptions.getPassword()).addCommonHeader().addTxnId(dorisCommittable.getTxnID()).setEmptyEntity().commit();
            try {
                closeableHttpResponse = this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
                i = closeableHttpResponse.getStatusLine().getStatusCode();
                str = closeableHttpResponse.getStatusLine().getReasonPhrase();
            } catch (IOException e) {
                hostPort = RestService.getBackend(this.dorisOptions, this.dorisReadOptions, LOG);
            }
            if (i == 200) {
                break;
            }
            LOG.warn("commit failed with {}, reason {}", hostPort, str);
            hostPort = RestService.getBackend(this.dorisOptions, this.dorisReadOptions, LOG);
        }
        if (i != 200) {
            throw new DorisRuntimeException("stream load error: " + str);
        }
        ObjectMapper objectMapper = new ObjectMapper();
        if (closeableHttpResponse.getEntity() != null) {
            String entityUtils = EntityUtils.toString(closeableHttpResponse.getEntity());
            Map map = (Map) objectMapper.readValue(entityUtils, new TypeReference<HashMap<String, String>>() { // from class: org.apache.doris.flink.sink.committer.DorisCommitter.1
            });
            if (((String) map.get("status")).equals(LoadStatus.FAIL) && !ResponseUtil.isCommitted((String) map.get("msg"))) {
                throw new DorisRuntimeException("Commit failed " + entityUtils);
            }
            LOG.info("load result {}", entityUtils);
        }
    }

    public void close() throws Exception {
        if (this.httpClient != null) {
            this.httpClient.close();
        }
    }
}
