package org.apache.doris.flink.table;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.shaded.org.apache.arrow.vector.util.DateUtility;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/table/DorisDynamicOutputFormat.class */
public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String COLUMNS_KEY = "columns";
    private static final String FIELD_DELIMITER_KEY = "column_separator";
    private static final String FIELD_DELIMITER_DEFAULT = "\t";
    private static final String LINE_DELIMITER_KEY = "line_delimiter";
    private static final String LINE_DELIMITER_DEFAULT = "\n";
    private static final String FORMAT_KEY = "format";
    private static final String FORMAT_JSON_VALUE = "json";
    private static final String NULL_VALUE = "\\N";
    private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
    private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
    private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
    private final String[] fieldNames;
    private final boolean jsonFormat;
    private final RowData.FieldGetter[] fieldGetters;
    private String fieldDelimiter;
    private String lineDelimiter;
    private DorisOptions options;
    private DorisReadOptions readOptions;
    private DorisExecutionOptions executionOptions;
    private DorisStreamLoad dorisStreamLoad;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;
    private final List batch = new ArrayList();
    private volatile transient boolean closed = false;
    private String keysType = parseKeysType();

    /* loaded from: input_file:org/apache/doris/flink/table/DorisDynamicOutputFormat$Builder.class */
    public static class Builder {
        private DorisOptions.Builder optionsBuilder = DorisOptions.builder();
        private DorisReadOptions readOptions;
        private DorisExecutionOptions executionOptions;
        private DataType[] fieldDataTypes;
        private String[] fieldNames;

        public Builder setFenodes(String str) {
            this.optionsBuilder.setFenodes(str);
            return this;
        }

        public Builder setUsername(String str) {
            this.optionsBuilder.setUsername(str);
            return this;
        }

        public Builder setPassword(String str) {
            this.optionsBuilder.setPassword(str);
            return this;
        }

        public Builder setTableIdentifier(String str) {
            this.optionsBuilder.setTableIdentifier(str);
            return this;
        }

        public Builder setReadOptions(DorisReadOptions dorisReadOptions) {
            this.readOptions = dorisReadOptions;
            return this;
        }

        public Builder setExecutionOptions(DorisExecutionOptions dorisExecutionOptions) {
            this.executionOptions = dorisExecutionOptions;
            return this;
        }

        public Builder setFieldNames(String[] strArr) {
            this.fieldNames = strArr;
            return this;
        }

        public Builder setFieldDataTypes(DataType[] dataTypeArr) {
            this.fieldDataTypes = dataTypeArr;
            return this;
        }

        public DorisDynamicOutputFormat build() {
            return new DorisDynamicOutputFormat(this.optionsBuilder.build(), this.readOptions, this.executionOptions, (LogicalType[]) Arrays.stream(this.fieldDataTypes).map((v0) -> {
                return v0.getLogicalType();
            }).toArray(i -> {
                return new LogicalType[i];
            }), this.fieldNames);
        }
    }

    public DorisDynamicOutputFormat(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions, LogicalType[] logicalTypeArr, String[] strArr) {
        this.options = dorisOptions;
        this.readOptions = dorisReadOptions;
        this.executionOptions = dorisExecutionOptions;
        this.fieldNames = strArr;
        this.jsonFormat = FORMAT_JSON_VALUE.equals(dorisExecutionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
        handleStreamloadProp();
        this.fieldGetters = new RowData.FieldGetter[logicalTypeArr.length];
        for (int i = 0; i < logicalTypeArr.length; i++) {
            this.fieldGetters[i] = RowData.createFieldGetter(logicalTypeArr[i], i);
        }
    }

    private String parseKeysType() {
        try {
            return RestService.getSchema(this.options, this.readOptions, LOG).getKeysType();
        } catch (DorisException e) {
            throw new RuntimeException("Failed fetch doris table schema: " + this.options.getTableIdentifier());
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    private void handleStreamloadProp() {
        Properties streamLoadProp = this.executionOptions.getStreamLoadProp();
        if (Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT))) {
            this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));
            this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT));
            if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
                streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
            }
        } else {
            this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
            this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
        }
        if (streamLoadProp.containsKey(COLUMNS_KEY) || this.fieldNames == null || this.fieldNames.length <= 0) {
            return;
        }
        String join = String.join(",", (Iterable<? extends CharSequence>) Arrays.stream(this.fieldNames).map(str -> {
            return String.format("`%s`", str.trim().replace("`", ""));
        }).collect(Collectors.toList()));
        if (enableBatchDelete()) {
            join = String.format("%s,%s", join, DORIS_DELETE_SIGN);
        }
        streamLoadProp.put(COLUMNS_KEY, join);
    }

    private String escapeString(String str) {
        Matcher matcher = Pattern.compile("\\\\x(\\d{2})").matcher(str);
        StringBuffer stringBuffer = new StringBuffer();
        while (matcher.find()) {
            matcher.appendReplacement(stringBuffer, String.format("%s", Character.valueOf((char) Integer.parseInt(matcher.group(1)))));
        }
        matcher.appendTail(stringBuffer);
        return stringBuffer.toString();
    }

    private boolean enableBatchDelete() {
        return this.executionOptions.getEnableDelete().booleanValue() || UNIQUE_KEYS_TYPE.equals(this.keysType);
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) throws IOException {
        this.dorisStreamLoad = new DorisStreamLoad(getBackend(), this.options.getTableIdentifier().split("\\.")[0], this.options.getTableIdentifier().split("\\.")[1], this.options.getUsername(), this.options.getPassword(), this.executionOptions.getStreamLoadProp());
        LOG.info("Streamload BE:{}", this.dorisStreamLoad.getLoadUrlStr());
        if (this.executionOptions.getBatchIntervalMs().longValue() == 0 || this.executionOptions.getBatchSize().intValue() == 1) {
            return;
        }
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        flush();
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.executionOptions.getBatchIntervalMs().longValue(), this.executionOptions.getBatchIntervalMs().longValue(), TimeUnit.MILLISECONDS);
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to streamload failed.", this.flushException);
        }
    }

    public synchronized void writeRecord(T t) throws IOException {
        checkFlushException();
        addBatch(t);
        if (this.executionOptions.getBatchSize().intValue() <= 0 || this.batch.size() < this.executionOptions.getBatchSize().intValue()) {
            return;
        }
        flush();
    }

    private void addBatch(T t) {
        if (!(t instanceof RowData)) {
            if (!(t instanceof String)) {
                throw new RuntimeException("The type of element should be 'RowData' or 'String' only.");
            }
            this.batch.add(t);
            return;
        }
        RowData rowData = (RowData) t;
        HashMap hashMap = new HashMap();
        StringJoiner stringJoiner = new StringJoiner(this.fieldDelimiter);
        for (int i = 0; i < rowData.getArity() && i < this.fieldGetters.length; i++) {
            Object fieldOrNull = this.fieldGetters[i].getFieldOrNull(rowData);
            if (this.jsonFormat) {
                hashMap.put(this.fieldNames[i], fieldOrNull != null ? fieldOrNull.toString() : null);
            } else {
                stringJoiner.add(fieldOrNull != null ? fieldOrNull.toString() : NULL_VALUE);
            }
        }
        if (enableBatchDelete()) {
            if (this.jsonFormat) {
                hashMap.put(DORIS_DELETE_SIGN, parseDeleteSign(rowData.getRowKind()));
            } else {
                stringJoiner.add(parseDeleteSign(rowData.getRowKind()));
            }
        }
        this.batch.add(this.jsonFormat ? hashMap : stringJoiner.toString());
    }

    private String parseDeleteSign(RowKind rowKind) {
        if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
            return "0";
        }
        if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
            return "1";
        }
        throw new RuntimeException("Unrecognized row kind:" + rowKind.toString());
    }

    public synchronized void close() throws IOException {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            try {
                try {
                    flush();
                    this.dorisStreamLoad.close();
                } catch (Exception e) {
                    LOG.warn("Writing records to doris failed.", e);
                    throw new RuntimeException("Writing records to doris failed.", e);
                }
            } catch (Throwable th) {
                this.dorisStreamLoad.close();
                throw th;
            }
        }
        checkFlushException();
    }

    public synchronized void flush() throws IOException {
        checkFlushException();
        if (this.batch.isEmpty()) {
            return;
        }
        String obj = this.jsonFormat ? this.batch.get(0) instanceof String ? this.batch.toString() : OBJECT_MAPPER.writeValueAsString(this.batch) : String.join(this.lineDelimiter, this.batch);
        for (int i = 0; i <= this.executionOptions.getMaxRetries().intValue(); i++) {
            try {
                this.dorisStreamLoad.load(obj);
                this.batch.clear();
                return;
            } catch (StreamLoadException e) {
                LOG.error("doris sink error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.executionOptions.getMaxRetries().intValue()) {
                    throw new IOException(e);
                }
                try {
                    this.dorisStreamLoad.setHostPort(getBackend());
                    LOG.warn("streamload error,switch be: {}", this.dorisStreamLoad.getLoadUrlStr(), e);
                    Thread.sleep(DateUtility.secondsToMillis * i);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IOException("unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    private String getBackend() throws IOException {
        try {
            return RestService.randomBackend(this.options, this.readOptions, LOG);
        } catch (IOException | DorisException e) {
            LOG.error("get backends info fail");
            throw new IOException(e);
        }
    }
}
