/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.warcar.integration.message;

import com.bxm.warcar.integration.message.AbstractMessageProducer;
import com.bxm.warcar.integration.message.MessageBody;
import com.bxm.warcar.integration.message.ThreadPoolMessageProducer;
import com.bxm.warcar.integration.message.annotation.Messaging;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.Producer;
import com.bxm.warcar.mq.SendResult;
import com.bxm.warcar.utils.JsonHelper;
import com.bxm.warcar.utils.NamedThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncMessageProducer
extends AbstractMessageProducer
implements ThreadPoolMessageProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncMessageProducer.class);
    private final ExecutorService threadPool;

    public AsyncMessageProducer(Producer producer, Object properties) {
        this(producer, properties, new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("mp")));
    }

    public AsyncMessageProducer(Producer producer, Object properties, ThreadPoolExecutor threadPool) {
        super(producer, properties);
        this.threadPool = threadPool;
    }

    @Override
    protected void sendMessage(String topic, Object returning, Object arg, Messaging messaging) {
        block2: {
            try {
                this.threadPool.submit(new MessageSendProcessor(this.producer, topic, returning, arg, messaging));
            }
            catch (RejectedExecutionException e) {
                if (!LOGGER.isErrorEnabled()) break block2;
                LOGGER.error("Rejected: topic={}, arg={}, returning={}", new Object[]{topic, arg, returning});
            }
        }
    }

    @Override
    public ExecutorService getThreadPool() {
        return this.threadPool;
    }

    private static class MessageSendProcessor
    implements Runnable {
        private final Producer producer;
        private final String topic;
        private final Object returning;
        private final Object arg;
        private final Messaging annotation;

        private MessageSendProcessor(Producer producer, String topic, Object returning, Object arg, Messaging annotation) {
            this.producer = producer;
            this.topic = topic;
            this.returning = returning;
            this.arg = arg;
            this.annotation = annotation;
        }

        @Override
        public void run() {
            this.sendMessage(this.returning, this.arg, this.annotation);
        }

        private byte[] serialize(Object body) {
            return JsonHelper.convert2bytes((Object)body);
        }

        private void sendMessage(Object returning, Object request, Messaging annotation) {
            block3: {
                String tags = annotation.tags();
                int flag = annotation.flag();
                int delayTimeLevel = annotation.delayTimeLevel();
                MessageBody body = new MessageBody(request, returning);
                Message message = new Message(this.topic, tags, flag, this.serialize(body));
                message.setDelayTimeLevel(delayTimeLevel);
                try {
                    SendResult result = this.producer.send(message);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Message send successful. {}", (Object)result.getMsgId());
                    }
                }
                catch (Exception e) {
                    if (!LOGGER.isErrorEnabled()) break block3;
                    LOGGER.error("Message send failed! ", (Throwable)e);
                }
            }
        }
    }
}

