/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.pangu.rta.api.mq;

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.ErrorMessageResult;
import com.bxm.pangu.rta.api.mq.AlionsHttpProperties;
import com.bxm.warcar.mq.Consumer;
import com.bxm.warcar.mq.Listener;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.SingleMessageListener;
import com.bxm.warcar.utils.LifeCycle;
import com.bxm.warcar.utils.NamedThreadFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlionsHttpConsumer
extends LifeCycle
implements Consumer {
    private static final Logger log = LoggerFactory.getLogger(AlionsHttpConsumer.class);
    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("alions-http-consumer"));
    private final AlionsHttpProperties properties;
    private final Listener messageListener;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private MQConsumer consumer;

    public AlionsHttpConsumer(AlionsHttpProperties properties, Listener messageListener) {
        Preconditions.checkNotNull((Object)properties);
        Preconditions.checkNotNull((Object)messageListener);
        this.properties = properties;
        this.messageListener = messageListener;
    }

    public void suspend() {
    }

    public void shutdown() {
        this.destroy();
    }

    public void start() {
        this.init();
    }

    public boolean isStarted() {
        return Objects.nonNull(this.consumer);
    }

    public Listener getMessageListener() {
        return this.messageListener;
    }

    protected void doInit() {
        this.createConsumer();
        this.pool.execute(this::doConsume);
    }

    private void createConsumer() {
        MQClient mqClient = new MQClient(this.properties.getEndpoint(), this.properties.getAccessKey(), this.properties.getSecretKey());
        String instanceId = this.properties.getInstanceId();
        String topic = this.messageListener.getTopic();
        String groupId = this.properties.getGroupId();
        this.consumer = StringUtils.isNotBlank((String)instanceId) ? mqClient.getConsumer(instanceId, topic, groupId, null) : mqClient.getConsumer(topic, groupId);
    }

    private void doConsume() {
        while (!this.shutdown.get()) {
            List messages = null;
            try {
                messages = this.consumer.consumeMessage(this.properties.getNum(), this.properties.getPollingSecond());
            }
            catch (Throwable e) {
                log.error("consumeMessage: ", e);
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            if (CollectionUtils.isEmpty((Collection)messages)) continue;
            ArrayList handlers = Lists.newArrayList();
            messages.forEach(message -> {
                Message msg = new Message();
                msg.setTopic(this.messageListener.getTopic());
                msg.setBody(message.getMessageBodyBytes());
                msg.setTags(message.getMessageTag());
                msg.setKey(message.getMessageKey());
                msg.setMsgId(message.getMessageId());
                if (this.messageListener instanceof SingleMessageListener) {
                    ((SingleMessageListener)this.messageListener).consume(msg, (Object)this.consumer);
                } else {
                    log.warn("Unsupport message listener: {}", (Object)this.messageListener);
                }
                handlers.add(message.getReceiptHandle());
            });
            try {
                this.consumer.ackMessage((List)handlers);
            }
            catch (Throwable e) {
                if (e instanceof AckMessageException) {
                    AckMessageException errors = (AckMessageException)e;
                    log.error("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                    if (errors.getErrorMessages() == null) continue;
                    for (String errorHandle : errors.getErrorMessages().keySet()) {
                        log.error("Handle:" + errorHandle + ", ErrorCode:" + ((ErrorMessageResult)errors.getErrorMessages().get(errorHandle)).getErrorCode() + ", ErrorMsg:" + ((ErrorMessageResult)errors.getErrorMessages().get(errorHandle)).getErrorMessage());
                    }
                    continue;
                }
                log.error("ackMessage: ", e);
            }
        }
    }

    protected void doDestroy() {
        this.pool.shutdown();
        this.shutdown.compareAndSet(false, true);
        try {
            this.pool.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.error("", (Throwable)e);
        }
    }
}

