/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubConsumer;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.LogThreadFactory;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;

public class ClientWorker
implements Runnable {
    private final ILogHubProcessorFactory mLogHubProcessorFactory;
    private final LogHubConfig mLogHubConfig;
    private final LogHubHeartBeat mLogHubHeartBeat;
    private boolean mShutDown = false;
    private final Map<Integer, LogHubConsumer> mShardConsumer = new HashMap<Integer, LogHubConsumer>();
    private final ExecutorService mExecutorService = Executors.newCachedThreadPool(new LogThreadFactory());
    private LogHubClientAdapter mLogHubClientAdapter;
    private static final Logger logger = Logger.getLogger(ClientWorker.class);

    public ClientWorker(ILogHubProcessorFactory factory, LogHubConfig config) throws LogHubClientWorkerException {
        this.mLogHubProcessorFactory = factory;
        this.mLogHubConfig = config;
        this.mLogHubClientAdapter = new LogHubClientAdapter(config.getLogHubEndPoint(), config.getAccessId(), config.getAccessKey(), config.getStsToken(), config.getProject(), config.getLogStore(), config.getConsumerGroupName(), config.getWorkerInstanceName());
        try {
            this.mLogHubClientAdapter.CreateConsumerGroup((int)(config.getHeartBeatIntervalMillis() * 2L / 1000L), config.isConsumeInOrder());
        }
        catch (LogException e) {
            if (e.GetErrorCode().compareToIgnoreCase("ConsumerGroupAlreadyExist") == 0) {
                try {
                    ConsumerGroup consumerGroup = this.mLogHubClientAdapter.GetConsumerGroup();
                    if (consumerGroup != null) {
                        if (consumerGroup.isInOrder() != this.mLogHubConfig.isConsumeInOrder() || consumerGroup.getTimeout() != (int)(this.mLogHubConfig.getHeartBeatIntervalMillis() * 2L / 1000L)) {
                            throw new LogHubClientWorkerException("consumer group is not agreed, AlreadyExistedConsumerGroup: {\"consumeInOrder\": " + consumerGroup.isInOrder() + ", \"timeoutInMillSecond\": " + consumerGroup.getTimeout() + "}");
                        }
                    }
                    throw new LogHubClientWorkerException("consumer group not exist");
                }
                catch (LogException e1) {
                    throw new LogHubClientWorkerException("error occour when get consumer group, errorCode: " + e1.GetErrorCode() + ", errorMessage: " + e1.GetErrorMessage());
                }
            }
            throw new LogHubClientWorkerException("error occour when create consumer group, errorCode: " + e.GetErrorCode() + ", errorMessage: " + e.GetErrorMessage());
        }
        this.mLogHubHeartBeat = new LogHubHeartBeat(this.mLogHubClientAdapter, config.getHeartBeatIntervalMillis());
    }

    public void SwitchClient(String accessKeyId, String accessKey) {
        this.mLogHubClientAdapter.SwitchClient(this.mLogHubConfig.getLogHubEndPoint(), accessKeyId, accessKey, null);
    }

    public void SwitchClient(String accessKeyId, String accessKey, String stsToken) {
        this.mLogHubClientAdapter.SwitchClient(this.mLogHubConfig.getLogHubEndPoint(), accessKeyId, accessKey, stsToken);
    }

    @Override
    public void run() {
        this.mLogHubHeartBeat.Start();
        ArrayList<Integer> heldShards = new ArrayList<Integer>();
        while (!this.mShutDown) {
            this.mLogHubHeartBeat.GetHeldShards(heldShards);
            for (int shard : heldShards) {
                LogHubConsumer consumer = this.getConsuemr(shard);
                consumer.consume();
            }
            this.cleanConsumer(heldShards);
            try {
                Thread.sleep(this.mLogHubConfig.getDataFetchIntervalMillis());
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void shutdown() {
        this.mShutDown = true;
        this.mLogHubHeartBeat.Stop();
    }

    private void cleanConsumer(ArrayList<Integer> ownedShard) {
        ArrayList<Integer> removeShards = new ArrayList<Integer>();
        for (Map.Entry<Integer, LogHubConsumer> shard : this.mShardConsumer.entrySet()) {
            LogHubConsumer consumer = shard.getValue();
            if (!ownedShard.contains(shard.getKey())) {
                consumer.shutdown();
                logger.warn((Object)("try to shut down a consumer shard:" + shard.getKey()));
            }
            if (!consumer.isShutdown()) continue;
            this.mLogHubHeartBeat.RemoveHeartShard(shard.getKey());
            removeShards.add(shard.getKey());
            logger.warn((Object)("remove a consumer shard:" + shard.getKey()));
        }
        Iterator<Map.Entry<Integer, LogHubConsumer>> iterator = removeShards.iterator();
        while (iterator.hasNext()) {
            int shard = (Integer)((Object)iterator.next());
            this.mShardConsumer.remove(shard);
        }
    }

    private LogHubConsumer getConsuemr(int shardId) {
        LogHubConsumer consumer = this.mShardConsumer.get(shardId);
        if (consumer != null) {
            return consumer;
        }
        consumer = new LogHubConsumer(this.mLogHubClientAdapter, shardId, this.mLogHubConfig.getWorkerInstanceName(), this.mLogHubProcessorFactory.generatorProcessor(), this.mExecutorService, this.mLogHubConfig.getCursorPosition(), this.mLogHubConfig.GetCursorStartTime());
        this.mShardConsumer.put(shardId, consumer);
        logger.warn((Object)("create a consumer shard:" + shardId));
        return consumer;
    }
}

