/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.loghub.client.FetchedLogGroup;
import com.aliyun.openservices.loghub.client.InnerFetcherProcessorFactory;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubConsumer;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.LogThreadFactory;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubShardListener;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

public class ClientFetcher {
    private final ILogHubProcessorFactory mLogHubProcessorFactory;
    private final LogHubConfig mLogHubConfig;
    private final LogHubHeartBeat mLogHubHeartBeat;
    private LogHubClientAdapter mLogHubClientAdapter;
    private final Map<Integer, LogHubConsumer> mShardConsumer = new HashMap<Integer, LogHubConsumer>();
    int _curShardIndex = 0;
    private final List<Integer> mShardList = new ArrayList<Integer>();
    private final Map<Integer, FetchedLogGroup> mCachedData = new HashMap<Integer, FetchedLogGroup>();
    private final ExecutorService mExecutorService = Executors.newCachedThreadPool(new LogThreadFactory());
    private final ScheduledExecutorService mShardListUpdateService = Executors.newScheduledThreadPool(1);
    private final long mShardListUpdateIntervalInMills = 500L;
    private ILogHubShardListener mLogHubShardListener = null;
    private static final Logger logger = Logger.getLogger(ClientFetcher.class);

    public ClientFetcher(LogHubConfig config) throws LogHubClientWorkerException {
        this.mLogHubProcessorFactory = new InnerFetcherProcessorFactory(this);
        this.mLogHubConfig = config;
        this.mLogHubClientAdapter = new LogHubClientAdapter(config.getLogHubEndPoint(), config.getAccessId(), config.getAccessKey(), config.getStsToken(), config.getProject(), config.getLogStore(), config.getConsumerGroupName(), config.getWorkerInstanceName());
        try {
            this.mLogHubClientAdapter.CreateConsumerGroup((int)(config.getHeartBeatIntervalMillis() * 2L / 1000L), config.isConsumeInOrder());
        }
        catch (LogException e) {
            if (e.GetErrorCode().compareToIgnoreCase("ConsumerGroupAlreadyExist") == 0) {
                try {
                    ConsumerGroup consumerGroup = this.mLogHubClientAdapter.GetConsumerGroup();
                    if (consumerGroup != null) {
                        if (consumerGroup.isInOrder() != this.mLogHubConfig.isConsumeInOrder() || consumerGroup.getTimeout() != (int)(this.mLogHubConfig.getHeartBeatIntervalMillis() * 2L / 1000L)) {
                            throw new LogHubClientWorkerException("consumer group is not agreed, AlreadyExistedConsumerGroup: {\"consumeInOrder\": " + consumerGroup.isInOrder() + ", \"timeoutInSecond\": " + consumerGroup.getTimeout() + "}");
                        }
                    }
                    throw new LogHubClientWorkerException("consumer group not exist");
                }
                catch (LogException e1) {
                    throw new LogHubClientWorkerException("error occour when get consumer group, errorCode: " + e1.GetErrorCode() + ", errorMessage: " + e1.GetErrorMessage());
                }
            }
            throw new LogHubClientWorkerException("error occour when create consumer group, errorCode: " + e.GetErrorCode() + ", errorMessage: " + e.GetErrorMessage());
        }
        this.mLogHubHeartBeat = new LogHubHeartBeat(this.mLogHubClientAdapter, config.getHeartBeatIntervalMillis());
    }

    public void SwitchClient(String accessKeyId, String accessKey) {
        this.mLogHubClientAdapter.SwitchClient(this.mLogHubConfig.getLogHubEndPoint(), accessKeyId, accessKey, null);
    }

    public void SwitchClient(String accessKeyId, String accessKey, String stsToken) {
        this.mLogHubClientAdapter.SwitchClient(this.mLogHubConfig.getLogHubEndPoint(), accessKeyId, accessKey, stsToken);
    }

    public void start() {
        this.mLogHubHeartBeat.Start();
        this.mShardListUpdateService.scheduleWithFixedDelay(new ShardListUpdator(), 0L, 500L, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        this.mLogHubHeartBeat.Stop();
        this.mShardListUpdateService.shutdown();
    }

    public void registerShardListener(ILogHubShardListener listener) {
        this.mLogHubShardListener = listener;
    }

    public ILogHubShardListener getShardListener() {
        return this.mLogHubShardListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FetchedLogGroup nextNoBlock() {
        FetchedLogGroup result = null;
        List<Integer> list = this.mShardList;
        synchronized (list) {
            for (int i = 0; i < this.mShardList.size(); ++i) {
                this._curShardIndex %= this.mShardList.size();
                int shardId = this.mShardList.get(this._curShardIndex);
                result = this.mCachedData.get(shardId);
                this.mCachedData.put(shardId, null);
                LogHubConsumer consumer = this.mShardConsumer.get(shardId);
                if (consumer != null) {
                    consumer.consume();
                }
                this._curShardIndex = (this._curShardIndex + 1) % this.mShardList.size();
                if (result != null) break;
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void saveCheckPoint(int shardId, String cursor, boolean persistent) throws LogHubCheckPointException {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            LogHubConsumer consumer = this.mShardConsumer.get(shardId);
            if (consumer == null) {
                throw new LogHubCheckPointException("Invalid shardId when saving checkpoint");
            }
            consumer.saveCheckPoint(cursor, persistent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateCachedData(int shardId, FetchedLogGroup data) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            this.mCachedData.put(shardId, data);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanCachedData(int shardId) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            this.mCachedData.remove(shardId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanConsumer(ArrayList<Integer> ownedShard) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            ArrayList<Integer> removeShards = new ArrayList<Integer>();
            for (Map.Entry<Integer, LogHubConsumer> shard : this.mShardConsumer.entrySet()) {
                LogHubConsumer consumer = shard.getValue();
                if (!ownedShard.contains(shard.getKey())) {
                    consumer.shutdown();
                }
                if (!consumer.isShutdown()) continue;
                this.mLogHubHeartBeat.RemoveHeartShard(shard.getKey());
                this.mShardConsumer.remove(shard.getKey());
                removeShards.add(shard.getKey());
                this.mShardList.remove(shard.getKey());
            }
            Iterator<Map.Entry<Integer, LogHubConsumer>> iterator = removeShards.iterator();
            while (iterator.hasNext()) {
                int shard = (Integer)((Object)iterator.next());
                this.mShardConsumer.remove(shard);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LogHubConsumer getConsuemr(int shardId) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            LogHubConsumer consumer = this.mShardConsumer.get(shardId);
            if (consumer != null) {
                return consumer;
            }
            consumer = new LogHubConsumer(this.mLogHubClientAdapter, shardId, this.mLogHubConfig.getWorkerInstanceName(), this.mLogHubProcessorFactory.generatorProcessor(), this.mExecutorService, this.mLogHubConfig.getCursorPosition(), this.mLogHubConfig.GetCursorStartTime());
            this.mShardConsumer.put(shardId, consumer);
            this.mShardList.add(shardId);
            consumer.consume();
            return consumer;
        }
    }

    private class ShardListUpdator
    implements Runnable {
        private ShardListUpdator() {
        }

        @Override
        public void run() {
            try {
                ArrayList<Integer> heldShards = new ArrayList<Integer>();
                ClientFetcher.this.mLogHubHeartBeat.GetHeldShards(heldShards);
                for (int shard : heldShards) {
                    ClientFetcher.this.getConsuemr(shard);
                }
                ClientFetcher.this.cleanConsumer(heldShards);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }
}

