/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.aliyun.logservice;

import com.aliyun.ms.MetaClient;
import com.aliyun.ms.utils.EndpointEnum;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubConsumer;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ClientWorker
implements Runnable {
    private static final Log LOG = LogFactory.getLog(ClientWorker.class);
    private final ILogHubProcessorFactory mLogHubProcessorFactory;
    private final LogHubConfig mLogHubConfig;
    private final LogHubHeartBeat mLogHubHeartBeat;
    private boolean mShutDown = false;
    private final Map<Integer, LogHubConsumer> mShardConsumer = new HashMap<Integer, LogHubConsumer>();
    private final ExecutorService mExecutorService = Executors.newCachedThreadPool();
    private LogHubClientAdapter mLogHubClientAdapter;
    private Client mClient;
    private String loghubEndpoint;
    private Boolean isRoleAK = false;

    public ClientWorker(ILogHubProcessorFactory factory, LogHubConfig config) throws LogHubClientWorkerException {
        this.mLogHubProcessorFactory = factory;
        this.mLogHubConfig = config;
        String accessKeyId = config.getAccessId();
        String accessKeySecret = config.getAccessKey();
        String securityToken = null;
        this.loghubEndpoint = config.getLogHubEndPoint();
        if (this.loghubEndpoint == null) {
            String region = MetaClient.getClusterRegionName();
            String nType = MetaClient.getClusterNetworkType();
            String project = config.getProject();
            this.loghubEndpoint = project + "." + EndpointEnum.getEndpoint((String)"log", (String)region, (String)nType);
        }
        if (accessKeyId == null || accessKeySecret == null) {
            accessKeyId = MetaClient.getRoleAccessKeyId();
            accessKeySecret = MetaClient.getRoleAccessKeySecret();
            securityToken = MetaClient.getRoleSecurityToken();
            this.isRoleAK = true;
            this.mClient = new Client(this.loghubEndpoint, accessKeyId, accessKeySecret);
            this.mClient.SetSecurityToken(securityToken);
        } else {
            this.mClient = new Client(this.loghubEndpoint, accessKeyId, accessKeySecret);
        }
        this.mLogHubClientAdapter = new LogHubClientAdapter(this.loghubEndpoint, accessKeyId, accessKeySecret, securityToken, config.getProject(), config.getLogStore(), config.getConsumerGroupName(), config.getWorkerInstanceName());
        try {
            this.mLogHubClientAdapter.CreateConsumerGroup((int)(config.getHeartBeatIntervalMillis() * 2L / 1000L), config.isConsumeInOrder());
        }
        catch (LogException e) {
            if (e.GetErrorCode().compareToIgnoreCase("ConsumerGroupAlreadyExist") == 0) {
                try {
                    ConsumerGroup cg = this.mLogHubClientAdapter.GetConsumerGroup();
                    if (cg == null) {
                        throw new LogHubClientWorkerException("consumer group not exist");
                    }
                    if (cg.isInOrder() != this.mLogHubConfig.isConsumeInOrder() || cg.getTimeout() != (int)(this.mLogHubConfig.getHeartBeatIntervalMillis() * 2L / 1000L)) {
                        throw new LogHubClientWorkerException("consumer group is not agreed, AlreadyExistedConsumerGroup: {\"consumeInOrder\": " + cg.isInOrder() + ", \"timeoutInMillSecond\": " + cg.getTimeout() + "}");
                    }
                }
                catch (LogException e1) {
                    throw new LogHubClientWorkerException("error occur when get consumer group, errorCode: " + e1.GetErrorCode() + ", errorMessage: " + e1.GetErrorMessage());
                }
            }
            throw new LogHubClientWorkerException("error occur when create consumer group, errorCode: " + e.GetErrorCode() + ", errorMessage: " + e.GetErrorMessage());
        }
        this.mLogHubHeartBeat = new LogHubHeartBeat(this.mLogHubClientAdapter, config.getHeartBeatIntervalMillis());
    }

    private void switchClient(String accessKeyId, String accessKey, String stsToken) {
        this.mLogHubClientAdapter.SwitchClient(this.loghubEndpoint, accessKeyId, accessKey, stsToken);
    }

    @Override
    public void run() {
        this.mLogHubHeartBeat.Start();
        ArrayList<Integer> heldShards = new ArrayList<Integer>();
        while (!this.mShutDown) {
            this.mLogHubHeartBeat.GetHeldShards(heldShards);
            Iterator var2 = heldShards.iterator();
            while (var2.hasNext()) {
                int shard = (Integer)var2.next();
                LogHubConsumer consumer = this.getConsumer(shard);
                consumer.consume();
            }
            this.cleanConsumer(heldShards);
            try {
                Thread.sleep(this.mLogHubConfig.getDataFetchIntervalMillis());
                this.checkAndUpdateToken();
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void shutdown() {
        this.mShutDown = true;
        this.mLogHubHeartBeat.Stop();
    }

    private void cleanConsumer(ArrayList<Integer> ownedShard) {
        ArrayList<Integer> removeShards = new ArrayList<Integer>();
        for (Map.Entry<Integer, LogHubConsumer> shard : this.mShardConsumer.entrySet()) {
            LogHubConsumer consumer = shard.getValue();
            if (!ownedShard.contains(shard.getKey())) {
                consumer.shutdown();
                LOG.warn((Object)("try to shut down a consumer shard:" + shard.getKey()));
            }
            if (!consumer.isShutdown()) continue;
            this.mLogHubHeartBeat.RemoveHeartShard(shard.getKey().intValue());
            removeShards.add(shard.getKey());
            LOG.warn((Object)("remove a consumer shard:" + shard.getKey()));
        }
        Iterator<Map.Entry<Integer, Object>> it = removeShards.iterator();
        while (it.hasNext()) {
            int shard1 = (Integer)((Object)it.next());
            this.mShardConsumer.remove(shard1);
        }
    }

    private LogHubConsumer getConsumer(int shardId) {
        LogHubConsumer consumer = this.mShardConsumer.get(shardId);
        if (consumer != null) {
            return consumer;
        }
        consumer = new LogHubConsumer(this.mLogHubClientAdapter, shardId, this.mLogHubConfig.getWorkerInstanceName(), this.mLogHubProcessorFactory.generatorProcessor(), this.mExecutorService, this.mLogHubConfig.getCursorPosition(), this.mLogHubConfig.GetCursorStartTime());
        this.mShardConsumer.put(shardId, consumer);
        LOG.warn((Object)("create a consumer shard:" + shardId));
        return consumer;
    }

    private void checkAndUpdateToken() {
        try {
            if (this.isRoleAK.booleanValue()) {
                this.mClient.HeartBeat(this.mLogHubConfig.getProject(), this.mLogHubConfig.getLogStore(), this.mLogHubConfig.getConsumerGroupName(), this.mLogHubConfig.getWorkerInstanceName(), new ArrayList());
            }
        }
        catch (LogException e) {
            String accessKeyId = MetaClient.getRoleAccessKeyId();
            String accessKeySecret = MetaClient.getRoleAccessKeySecret();
            String securityToken = MetaClient.getRoleSecurityToken();
            this.mClient = new Client(this.loghubEndpoint, accessKeyId, accessKeySecret);
            this.mClient.SetSecurityToken(securityToken);
            this.switchClient(accessKeyId, accessKeySecret, securityToken);
        }
    }
}

