package com.bxm.pangu.rta.api.mq;

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.ErrorMessageResult;
import com.bxm.warcar.mq.Consumer;
import com.bxm.warcar.mq.Listener;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.SingleMessageListener;
import com.bxm.warcar.utils.LifeCycle;
import com.bxm.warcar.utils.NamedThreadFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bxm/pangu/rta/api/mq/AlionsHttpConsumer.class */
public class AlionsHttpConsumer extends LifeCycle implements Consumer {
    private static final Logger log = LoggerFactory.getLogger(AlionsHttpConsumer.class);
    private final AlionsHttpProperties properties;
    private final Listener messageListener;
    private MQConsumer consumer;
    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("alions-http-consumer"));
    private final AtomicBoolean shutdown = new AtomicBoolean(false);

    public AlionsHttpConsumer(AlionsHttpProperties alionsHttpProperties, Listener listener) {
        Preconditions.checkNotNull(alionsHttpProperties);
        Preconditions.checkNotNull(listener);
        this.properties = alionsHttpProperties;
        this.messageListener = listener;
    }

    public void suspend() {
    }

    public void shutdown() {
        destroy();
    }

    public void start() {
        init();
    }

    public boolean isStarted() {
        return Objects.nonNull(this.consumer);
    }

    public Listener getMessageListener() {
        return this.messageListener;
    }

    protected void doInit() {
        createConsumer();
        this.pool.execute(this::doConsume);
    }

    private void createConsumer() {
        MQClient mQClient = new MQClient(this.properties.getEndpoint(), this.properties.getAccessKey(), this.properties.getSecretKey());
        String instanceId = this.properties.getInstanceId();
        String topic = this.messageListener.getTopic();
        String groupId = this.properties.getGroupId();
        if (StringUtils.isNotBlank(instanceId)) {
            this.consumer = mQClient.getConsumer(instanceId, topic, groupId, (String) null);
        } else {
            this.consumer = mQClient.getConsumer(topic, groupId);
        }
    }

    private void doConsume() {
        while (!this.shutdown.get()) {
            List list = null;
            try {
                list = this.consumer.consumeMessage(this.properties.getNum(), this.properties.getPollingSecond());
            } catch (Throwable th) {
                log.error("consumeMessage: ", th);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            if (!CollectionUtils.isEmpty(list)) {
                ArrayList newArrayList = Lists.newArrayList();
                list.forEach(message -> {
                    Message message = new Message();
                    message.setTopic(this.messageListener.getTopic());
                    message.setBody(message.getMessageBodyBytes());
                    message.setTags(message.getMessageTag());
                    message.setKey(message.getMessageKey());
                    message.setMsgId(message.getMessageId());
                    if (this.messageListener instanceof SingleMessageListener) {
                        this.messageListener.consume(message, this.consumer);
                    } else {
                        log.warn("Unsupport message listener: {}", this.messageListener);
                    }
                    newArrayList.add(message.getReceiptHandle());
                });
                try {
                    this.consumer.ackMessage(newArrayList);
                } catch (Throwable th2) {
                    if (th2 instanceof AckMessageException) {
                        AckMessageException ackMessageException = th2;
                        log.error("Ack message fail, requestId is:" + ackMessageException.getRequestId() + ", fail handles:");
                        if (ackMessageException.getErrorMessages() != null) {
                            for (String str : ackMessageException.getErrorMessages().keySet()) {
                                log.error("Handle:" + str + ", ErrorCode:" + ((ErrorMessageResult) ackMessageException.getErrorMessages().get(str)).getErrorCode() + ", ErrorMsg:" + ((ErrorMessageResult) ackMessageException.getErrorMessages().get(str)).getErrorMessage());
                            }
                        }
                    } else {
                        log.error("ackMessage: ", th2);
                    }
                }
            }
        }
    }

    protected void doDestroy() {
        this.pool.shutdown();
        this.shutdown.compareAndSet(false, true);
        try {
            this.pool.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("", e);
        }
    }
}
