package com.bxm.warcar.integration.message;

import com.bxm.warcar.integration.message.annotation.Messaging;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.Producer;
import com.bxm.warcar.mq.SendResult;
import com.bxm.warcar.utils.JsonHelper;
import com.bxm.warcar.utils.NamedThreadFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bxm/warcar/integration/message/AsyncMessageProducer.class */
public class AsyncMessageProducer extends AbstractMessageProducer implements ThreadPoolMessageProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncMessageProducer.class);
    private final ExecutorService threadPool;

    /* loaded from: input_file:com/bxm/warcar/integration/message/AsyncMessageProducer$MessageSendProcessor.class */
    private static class MessageSendProcessor implements Runnable {
        private final Producer producer;
        private final String topic;
        private final Object returning;
        private final Object arg;
        private final Messaging annotation;

        private MessageSendProcessor(Producer producer, String str, Object obj, Object obj2, Messaging messaging) {
            this.producer = producer;
            this.topic = str;
            this.returning = obj;
            this.arg = obj2;
            this.annotation = messaging;
        }

        @Override // java.lang.Runnable
        public void run() {
            sendMessage(this.returning, this.arg, this.annotation);
        }

        private byte[] serialize(Object obj) {
            return JsonHelper.convert2bytes(obj);
        }

        private void sendMessage(Object obj, Object obj2, Messaging messaging) {
            String tags = messaging.tags();
            int flag = messaging.flag();
            int delayTimeLevel = messaging.delayTimeLevel();
            Message message = new Message(this.topic, tags, flag, serialize(new MessageBody(obj2, obj)));
            message.setDelayTimeLevel(delayTimeLevel);
            try {
                SendResult send = this.producer.send(message);
                if (AsyncMessageProducer.LOGGER.isDebugEnabled()) {
                    AsyncMessageProducer.LOGGER.debug("Message send successful. {}", send.getMsgId());
                }
            } catch (Exception e) {
                if (AsyncMessageProducer.LOGGER.isErrorEnabled()) {
                    AsyncMessageProducer.LOGGER.error("Message send failed! ", e);
                }
            }
        }
    }

    public AsyncMessageProducer(Producer producer, Object obj) {
        this(producer, obj, new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("mp")));
    }

    public AsyncMessageProducer(Producer producer, Object obj, ThreadPoolExecutor threadPoolExecutor) {
        super(producer, obj);
        this.threadPool = threadPoolExecutor;
    }

    @Override // com.bxm.warcar.integration.message.ThreadPoolMessageProducer
    public ExecutorService getThreadPool() {
        return this.threadPool;
    }

    @Override // com.bxm.warcar.integration.message.AbstractMessageProducer
    protected void sendMessage(String str, Object obj, Object obj2, Messaging messaging) {
        try {
            this.threadPool.submit(new MessageSendProcessor(this.producer, str, obj, obj2, messaging));
        } catch (RejectedExecutionException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Rejected: topic={}, arg={}, returning={}", new Object[]{str, obj2, obj});
            }
        }
    }
}
