package org.apache.doris.flink.backend;

import org.apache.doris.flink.cfg.ConfigurationOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.ConnectedFailedException;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisInternalException;
import org.apache.doris.flink.serialization.Routing;
import org.apache.doris.flink.util.ErrorMessages;
import org.apache.doris.shaded.org.apache.thrift.TException;
import org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.doris.shaded.org.apache.thrift.transport.TSocket;
import org.apache.doris.shaded.org.apache.thrift.transport.TTransport;
import org.apache.doris.shaded.org.apache.thrift.transport.TTransportException;
import org.apache.doris.thrift.TDorisExternalService;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
import org.apache.doris.thrift.TScanCloseResult;
import org.apache.doris.thrift.TScanNextBatchParams;
import org.apache.doris.thrift.TScanOpenParams;
import org.apache.doris.thrift.TScanOpenResult;
import org.apache.doris.thrift.TStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/backend/BackendClient.class */
public class BackendClient {
    private static Logger logger = LoggerFactory.getLogger(BackendClient.class);
    private Routing routing;
    private TDorisExternalService.Client client;
    private TTransport transport;
    private boolean isConnected = false;
    private final int retries;
    private final int socketTimeout;
    private final int connectTimeout;

    public BackendClient(Routing routing, DorisReadOptions dorisReadOptions) throws ConnectedFailedException {
        this.routing = routing;
        this.connectTimeout = (dorisReadOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : dorisReadOptions.getRequestConnectTimeoutMs()).intValue();
        this.socketTimeout = (dorisReadOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : dorisReadOptions.getRequestReadTimeoutMs()).intValue();
        this.retries = (dorisReadOptions.getRequestRetries() == null ? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT : dorisReadOptions.getRequestRetries()).intValue();
        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", new Object[]{Integer.valueOf(this.connectTimeout), Integer.valueOf(this.socketTimeout), Integer.valueOf(this.retries)});
        open();
    }

    private void open() throws ConnectedFailedException {
        logger.debug("Open client to Doris BE '{}'.", this.routing);
        TTransportException tTransportException = null;
        for (int i = 0; !this.isConnected && i < this.retries; i++) {
            logger.debug("Attempt {} to connect {}.", Integer.valueOf(i), this.routing);
            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
            this.transport = new TSocket(this.routing.getHost(), this.routing.getPort(), this.socketTimeout, this.connectTimeout);
            this.client = new TDorisExternalService.Client(factory.getProtocol(this.transport));
            if (this.isConnected) {
                logger.info("Success connect to {}.", this.routing);
                return;
            }
            try {
                logger.trace("Connect status before open transport to {} is '{}'.", this.routing, Boolean.valueOf(this.isConnected));
                if (!this.transport.isOpen()) {
                    this.transport.open();
                    this.isConnected = true;
                }
            } catch (TTransportException e) {
                logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing, e);
                tTransportException = e;
            }
        }
        if (this.isConnected) {
            return;
        }
        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing);
        throw new ConnectedFailedException(this.routing.toString(), tTransportException);
    }

    private void close() {
        logger.trace("Connect status before close with '{}' is '{}'.", this.routing, Boolean.valueOf(this.isConnected));
        this.isConnected = false;
        if (this.transport != null && this.transport.isOpen()) {
            this.transport.close();
            logger.info("Closed a connection to {}.", this.routing);
        }
        if (null != this.client) {
            this.client = null;
        }
    }

    public TScanOpenResult openScanner(TScanOpenParams tScanOpenParams) throws ConnectedFailedException {
        logger.debug("OpenScanner to '{}', parameter is '{}'.", this.routing, tScanOpenParams);
        if (!this.isConnected) {
            open();
        }
        TException tException = null;
        for (int i = 0; i < this.retries; i++) {
            logger.debug("Attempt {} to openScanner {}.", Integer.valueOf(i), this.routing);
            try {
                TScanOpenResult openScanner = this.client.openScanner(tScanOpenParams);
                if (openScanner == null) {
                    logger.warn("Open scanner result from {} is null.", this.routing);
                } else {
                    if (TStatusCode.OK.equals(openScanner.getStatus().getStatusCode())) {
                        return openScanner;
                    }
                    logger.warn("The status of open scanner result from {} is '{}', error message is: {}.", new Object[]{this.routing, openScanner.getStatus().getStatusCode(), openScanner.getStatus().getErrorMsgs()});
                }
            } catch (TException e) {
                logger.warn("Open scanner from {} failed.", this.routing, e);
                tException = e;
            }
        }
        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing);
        throw new ConnectedFailedException(this.routing.toString(), tException);
    }

    public TScanBatchResult getNext(TScanNextBatchParams tScanNextBatchParams) throws DorisException {
        logger.debug("GetNext to '{}', parameter is '{}'.", this.routing, tScanNextBatchParams);
        if (!this.isConnected) {
            open();
        }
        TException tException = null;
        TScanBatchResult tScanBatchResult = null;
        for (int i = 0; i < this.retries; i++) {
            logger.debug("Attempt {} to getNext {}.", Integer.valueOf(i), this.routing);
            try {
                tScanBatchResult = this.client.getNext(tScanNextBatchParams);
                if (tScanBatchResult == null) {
                    logger.warn("GetNext result from {} is null.", this.routing);
                } else {
                    if (TStatusCode.OK.equals(tScanBatchResult.getStatus().getStatusCode())) {
                        return tScanBatchResult;
                    }
                    logger.warn("The status of get next result from {} is '{}', error message is: {}.", new Object[]{this.routing, tScanBatchResult.getStatus().getStatusCode(), tScanBatchResult.getStatus().getErrorMsgs()});
                }
            } catch (TException e) {
                logger.warn("Get next from {} failed.", this.routing, e);
                tException = e;
            }
        }
        if (tScanBatchResult == null || TStatusCode.OK == tScanBatchResult.getStatus().getStatusCode()) {
            logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, this.routing);
            throw new ConnectedFailedException(this.routing.toString(), tException);
        }
        logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, new Object[]{this.routing, tScanBatchResult.getStatus().getStatusCode(), tScanBatchResult.getStatus().getErrorMsgs()});
        throw new DorisInternalException(this.routing.toString(), tScanBatchResult.getStatus().getStatusCode(), tScanBatchResult.getStatus().getErrorMsgs());
    }

    public void closeScanner(TScanCloseParams tScanCloseParams) {
        logger.debug("CloseScanner to '{}', parameter is '{}'.", this.routing, tScanCloseParams);
        for (int i = 0; i < this.retries; i++) {
            logger.debug("Attempt {} to closeScanner {}.", Integer.valueOf(i), this.routing);
            try {
                TScanCloseResult closeScanner = this.client.closeScanner(tScanCloseParams);
                if (closeScanner == null) {
                    logger.warn("CloseScanner result from {} is null.", this.routing);
                } else if (TStatusCode.OK.equals(closeScanner.getStatus().getStatusCode())) {
                    break;
                } else {
                    logger.warn("The status of get next result from {} is '{}', error message is: {}.", new Object[]{this.routing, closeScanner.getStatus().getStatusCode(), closeScanner.getStatus().getErrorMsgs()});
                }
            } catch (TException e) {
                logger.warn("Close scanner from {} failed.", this.routing, e);
            }
        }
        logger.info("CloseScanner to Doris BE '{}' success.", this.routing);
        close();
    }
}
