/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.loghub.client.DefaultLogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.FetchTaskResult;
import com.aliyun.openservices.loghub.client.FetchedLogGroup;
import com.aliyun.openservices.loghub.client.ITask;
import com.aliyun.openservices.loghub.client.InitTaskResult;
import com.aliyun.openservices.loghub.client.InitializeTask;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubFetchTask;
import com.aliyun.openservices.loghub.client.ProcessTask;
import com.aliyun.openservices.loghub.client.ProcessTaskResult;
import com.aliyun.openservices.loghub.client.ShutDownTask;
import com.aliyun.openservices.loghub.client.TaskResult;
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;

public class LogHubConsumer {
    private int mShardId;
    private String mInstanceName;
    private LogHubClientAdapter mLogHubClientAdapter;
    private DefaultLogHubCheckPointTracker mCheckPointTracker;
    private ILogHubProcessor mProcessor;
    private LogHubCursorPosition mCursorPosition;
    private int mCursorStartTime = 0;
    private ConsumerStatus mCurStatus = ConsumerStatus.INITIALIZING;
    private ITask mCurrentTask;
    private Future<TaskResult> mTaskFuture;
    private Future<TaskResult> mFetchDataFeture;
    private ExecutorService mExecutorService;
    private String mNextFetchCursor;
    private boolean mShutDown = false;
    private FetchedLogGroup mLastFetchedData;
    private static final Logger logger = Logger.getLogger(LogHubConsumer.class);
    private long mLastLogErrorTime = 0L;
    private long mLastFetchTime = 0L;
    private int mLastFetchCount = 0;
    private int mLastFetchRawSize = 0;

    public LogHubConsumer(LogHubClientAdapter logHubClientAdapter, int shardId, String instanceName, ILogHubProcessor processor, ExecutorService executorService, LogHubCursorPosition cursorPosition, int cursorStartTime) {
        this.mLogHubClientAdapter = logHubClientAdapter;
        this.mShardId = shardId;
        this.mInstanceName = instanceName;
        this.mCursorPosition = cursorPosition;
        this.mCursorStartTime = cursorStartTime;
        this.mProcessor = processor;
        this.mCheckPointTracker = new DefaultLogHubCheckPointTracker(logHubClientAdapter, this.mInstanceName, this.mShardId);
        this.mExecutorService = executorService;
    }

    public void consume() {
        this.checkAndGenerateNextTask();
        if (this.mCurStatus.equals((Object)ConsumerStatus.PROCESSING) && this.mLastFetchedData == null) {
            this.fetchData();
        }
    }

    public void saveCheckPoint(String cursor, boolean persistent) throws LogHubCheckPointException {
        this.mCheckPointTracker.saveCheckPoint(cursor, persistent);
    }

    private void checkAndGenerateNextTask() {
        if (this.mTaskFuture == null || this.mTaskFuture.isCancelled() || this.mTaskFuture.isDone()) {
            boolean taskSuccess = false;
            TaskResult result = this.getTaskResult(this.mTaskFuture);
            this.mTaskFuture = null;
            if (result != null && result.getException() == null) {
                ProcessTaskResult process_task_result;
                String roll_back_checkpoint;
                taskSuccess = true;
                if (this.mCurStatus.equals((Object)ConsumerStatus.INITIALIZING)) {
                    InitTaskResult initResult = (InitTaskResult)result;
                    this.mNextFetchCursor = initResult.getCursor();
                    this.mCheckPointTracker.setInMemoryCheckPoint(this.mNextFetchCursor);
                    if (initResult.isCursorPersistent()) {
                        this.mCheckPointTracker.setInPeristentCheckPoint(this.mNextFetchCursor);
                    }
                } else if (result instanceof ProcessTaskResult && (roll_back_checkpoint = (process_task_result = (ProcessTaskResult)result).getRollBackCheckpoint()) != null && !roll_back_checkpoint.isEmpty()) {
                    this.mLastFetchedData = null;
                    this.CancelCurrentFetch();
                    this.mNextFetchCursor = roll_back_checkpoint;
                }
            }
            this.sampleLogError(result);
            this.updateStatus(taskSuccess);
            this.generateNextTask();
        }
    }

    private void fetchData() {
        if (this.mFetchDataFeture == null || this.mFetchDataFeture.isCancelled() || this.mFetchDataFeture.isDone()) {
            TaskResult result = this.getTaskResult(this.mFetchDataFeture);
            if (result != null && result.getException() == null) {
                FetchTaskResult fetchResult = (FetchTaskResult)result;
                this.mLastFetchedData = new FetchedLogGroup(this.mShardId, fetchResult.getFetchedData(), fetchResult.getCursor());
                this.mNextFetchCursor = fetchResult.getCursor();
                this.mLastFetchCount = this.mLastFetchedData.mFetchedData.size();
                this.mLastFetchRawSize = fetchResult.getRawSize();
            }
            this.sampleLogError(result);
            if (result == null || result.getException() == null) {
                boolean genFetchTask = true;
                if (this.mLastFetchRawSize < 0x100000 && this.mLastFetchCount < 100) {
                    genFetchTask = System.currentTimeMillis() - this.mLastFetchTime > 500L;
                } else if (this.mLastFetchRawSize < 0x200000 && this.mLastFetchCount < 500) {
                    genFetchTask = System.currentTimeMillis() - this.mLastFetchTime > 200L;
                } else if (this.mLastFetchRawSize < 0x400000 && this.mLastFetchCount < 1000) {
                    boolean bl = genFetchTask = System.currentTimeMillis() - this.mLastFetchTime > 50L;
                }
                if (genFetchTask) {
                    this.mLastFetchTime = System.currentTimeMillis();
                    LogHubFetchTask task = new LogHubFetchTask(this.mLogHubClientAdapter, this.mShardId, this.mNextFetchCursor);
                    this.mFetchDataFeture = this.mExecutorService.submit(task);
                } else {
                    this.mFetchDataFeture = null;
                }
            } else {
                this.mFetchDataFeture = null;
            }
        }
    }

    private void sampleLogError(TaskResult result) {
        long curTime;
        if (result != null && result.getException() != null && (curTime = System.currentTimeMillis()) - this.mLastLogErrorTime > 5000L) {
            logger.warn((Object)result.getException());
            this.mLastLogErrorTime = curTime;
        }
    }

    private TaskResult getTaskResult(Future<TaskResult> future) {
        if (future != null && (future.isDone() || future.isCancelled())) {
            try {
                return future.get();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return null;
    }

    private void CancelCurrentFetch() {
        if (this.mFetchDataFeture != null) {
            this.mFetchDataFeture.cancel(true);
            this.getTaskResult(this.mFetchDataFeture);
            logger.warn((Object)("Cancel a fetch task, shard id:" + this.mShardId));
            this.mFetchDataFeture = null;
        }
    }

    private void generateNextTask() {
        ITask nextTask = null;
        if (this.mCurStatus.equals((Object)ConsumerStatus.INITIALIZING)) {
            nextTask = new InitializeTask(this.mProcessor, this.mLogHubClientAdapter, this.mShardId, this.mCursorPosition, this.mCursorStartTime);
        } else if (this.mCurStatus.equals((Object)ConsumerStatus.PROCESSING)) {
            if (this.mLastFetchedData != null) {
                this.mCheckPointTracker.setCursor(this.mLastFetchedData.mEndCursor);
                nextTask = new ProcessTask(this.mProcessor, this.mLastFetchedData.mFetchedData, this.mCheckPointTracker);
                this.mLastFetchedData = null;
            }
        } else if (this.mCurStatus.equals((Object)ConsumerStatus.SHUTTING_DOWN)) {
            nextTask = new ShutDownTask(this.mProcessor, this.mCheckPointTracker);
            this.CancelCurrentFetch();
        }
        if (nextTask != null) {
            this.mCurrentTask = nextTask;
            this.mTaskFuture = this.mExecutorService.submit(this.mCurrentTask);
        }
    }

    private void updateStatus(boolean taskSuccess) {
        if (this.mCurStatus.equals((Object)ConsumerStatus.SHUTTING_DOWN)) {
            if (this.mCurrentTask == null || taskSuccess) {
                this.mCurStatus = ConsumerStatus.SHUTDOWN_COMPLETE;
            }
        } else if (this.mShutDown) {
            this.mCurStatus = ConsumerStatus.SHUTTING_DOWN;
        } else if (taskSuccess && this.mCurStatus.equals((Object)ConsumerStatus.INITIALIZING)) {
            this.mCurStatus = ConsumerStatus.PROCESSING;
        }
    }

    public void shutdown() {
        this.mShutDown = true;
        if (!this.isShutdown()) {
            this.checkAndGenerateNextTask();
        }
    }

    public boolean isShutdown() {
        return this.mCurStatus.equals((Object)ConsumerStatus.SHUTDOWN_COMPLETE);
    }

    static enum ConsumerStatus {
        INITIALIZING,
        PROCESSING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE;

    }
}

