package com.aliyun.openservices.tablestore.hadoop;

import com.alicloud.openservices.tablestore.SyncClientInterface;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
import com.alicloud.openservices.tablestore.model.Error;
import com.alicloud.openservices.tablestore.model.RowChange;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/tablestore/hadoop/TableStoreRecordWriter.class */
public class TableStoreRecordWriter extends RecordWriter<Writable, BatchWriteWritable> {
    private SyncClientInterface ots;
    private String outputTable;
    private long rowCounter;
    private Deque<RowChange> waitingRows = new ArrayDeque();
    private int batchSize = BATCH_SIZE_MAX;
    private static final Logger logger = LoggerFactory.getLogger(TableStoreRecordWriter.class);
    private static int BATCH_SIZE_MAX = 200;
    private static int BATCH_SIZE_INCR = 10;
    private static double BATCH_SIZE_BACKOFF = 0.8d;

    public TableStoreRecordWriter(SyncClientInterface syncClientInterface, String str) {
        Preconditions.checkNotNull(syncClientInterface, "ots client must be nonnull.");
        Preconditions.checkNotNull(str, "output table must be nonnull.");
        this.ots = syncClientInterface;
        this.outputTable = str;
        this.rowCounter = 0L;
    }

    public void write(Writable writable, BatchWriteWritable batchWriteWritable) {
        Iterator<RowChange> it = batchWriteWritable.getRowChanges().iterator();
        while (it.hasNext()) {
            this.waitingRows.addLast(it.next());
        }
        while (this.waitingRows.size() >= this.batchSize) {
            this.rowCounter += batchWrite();
        }
    }

    public void close(TaskAttemptContext taskAttemptContext) {
        while (!this.waitingRows.isEmpty()) {
            this.rowCounter += batchWrite();
        }
        logger.info("this task wrote {} rows", Long.valueOf(this.rowCounter));
        this.ots.shutdown();
    }

    private int batchWrite() {
        while (true) {
            Deque<RowChange> prepareRows = prepareRows();
            if (prepareRows.size() < this.batchSize) {
                logger.info("small batch size: {}", Integer.valueOf(prepareRows.size()));
            } else {
                logger.debug("batch size: {}", Integer.valueOf(prepareRows.size()));
            }
            try {
                launch(prepareRows);
                this.batchSize += BATCH_SIZE_INCR;
                if (this.batchSize > BATCH_SIZE_MAX) {
                    this.batchSize = BATCH_SIZE_MAX;
                }
                return prepareRows.size();
            } catch (TableStoreException e) {
                if (e.getErrorCode() != "OTSParameterInvalid" || this.batchSize == 1) {
                    throw e;
                }
                logger.info("Batch-size backs off. current batch-size: {}", Integer.valueOf(prepareRows.size()));
                this.batchSize = (int) (prepareRows.size() * BATCH_SIZE_BACKOFF);
                if (this.batchSize < 1) {
                    this.batchSize = 1;
                }
                while (!prepareRows.isEmpty()) {
                    this.waitingRows.addFirst(prepareRows.pollLast());
                }
            }
        }
    }

    private Deque<RowChange> prepareRows() {
        ArrayDeque arrayDeque = new ArrayDeque();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.batchSize && !this.waitingRows.isEmpty(); i++) {
            RowChange pollFirst = this.waitingRows.pollFirst();
            int hashCode = (pollFirst.getTableName().hashCode() * 1327144901) + pollFirst.getPrimaryKey().hashCode();
            if (hashSet.contains(Integer.valueOf(hashCode))) {
                break;
            }
            arrayDeque.addLast(pollFirst);
            hashSet.add(Integer.valueOf(hashCode));
        }
        return arrayDeque;
    }

    private void launch(Deque<RowChange> deque) {
        BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
        Iterator<RowChange> it = deque.iterator();
        while (it.hasNext()) {
            batchWriteRowRequest.addRowChange(it.next());
        }
        BatchWriteRowResponse batchWriteRow = this.ots.batchWriteRow(batchWriteRowRequest);
        List<BatchWriteRowResponse.RowResult> failedRows = batchWriteRow.getFailedRows();
        for (BatchWriteRowResponse.RowResult rowResult : failedRows) {
            logger.error("fail to write to TableStore. table: {} error-code: {} error-message: {} request-id: {}", new Object[]{rowResult.getTableName(), rowResult.getError().getCode(), rowResult.getError().getMessage(), batchWriteRow.getRequestId()});
        }
        if (failedRows.isEmpty()) {
            return;
        }
        Error error = ((BatchWriteRowResponse.RowResult) failedRows.get(0)).getError();
        throw new TableStoreException(error.getMessage(), (Throwable) null, error.getCode(), batchWriteRow.getRequestId(), 0);
    }
}
