package com.bxm.warcar.canal.mq;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.bxm.warcar.canal.CanalClient;
import com.bxm.warcar.canal.CanalEntity;
import com.bxm.warcar.canal.CanalEventListener;
import com.bxm.warcar.canal.CanalEventType;
import com.bxm.warcar.mq.Consumer;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.Producer;
import com.bxm.warcar.mq.SendException;
import com.bxm.warcar.mq.SendResult;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bxm/warcar/canal/mq/CanalMessageQueueClient.class */
public class CanalMessageQueueClient extends CanalClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(CanalMessageQueueClient.class);
    private final Producer producer;
    private final Consumer consumer;
    private final String topic;

    public CanalMessageQueueClient(Producer producer, Consumer consumer, String str, String str2, String str3) {
        this(producer, consumer, str, str2, str3, null, null);
    }

    public CanalMessageQueueClient(Producer producer, Consumer consumer, String str, String str2, String str3, String str4, String str5) {
        super(str2, str3, str4, str5);
        this.producer = producer;
        this.consumer = consumer;
        this.topic = str;
        if (!producer.isStarted()) {
            producer.start();
        }
        if (!consumer.isStarted()) {
            consumer.start();
        }
        CanalMessageListenerDispatcher messageListener = consumer.getMessageListener();
        if (!(messageListener instanceof CanalMessageListenerDispatcher)) {
            throw new RuntimeException("This message listener must be instance CanalMessageListenerDispatcher for the consumer.");
        }
        super.addListeners(messageListener.getListeners());
    }

    @Override // com.bxm.warcar.canal.CanalClient
    public void addListener(CanalEventListener<? extends CanalEntity> canalEventListener) {
        throw new UnsupportedOperationException("Please use com.bxm.warcar.canal.mq.CanalMessageListenerDispatcher#(String, CanalEventListener<? extends CanalEntity>)");
    }

    @Override // com.bxm.warcar.canal.CanalClient
    protected <T extends CanalEntity> void doDelete(List<CanalEntry.Column> list, CanalEventListener<T> canalEventListener) {
        CanalEntity convert = convert(list, canalEventListener);
        if (null == convert) {
            return;
        }
        sendMessage(canalEventListener, new CanalMessageBody(CanalEventType.DELETE, convert, null));
    }

    @Override // com.bxm.warcar.canal.CanalClient
    protected <T extends CanalEntity> void doInsert(List<CanalEntry.Column> list, CanalEventListener<T> canalEventListener) {
        CanalEntity convert = convert(list, canalEventListener);
        if (null == convert) {
            return;
        }
        sendMessage(canalEventListener, new CanalMessageBody(CanalEventType.INSERT, convert, null));
    }

    @Override // com.bxm.warcar.canal.CanalClient
    protected <T extends CanalEntity> void doUpdate(List<CanalEntry.Column> list, List<CanalEntry.Column> list2, CanalEventListener<T> canalEventListener) {
        CanalEntity convert;
        CanalEntity convert2 = convert(list, canalEventListener);
        if (null == convert2 || null == (convert = convert(list2, canalEventListener))) {
            return;
        }
        sendMessage(canalEventListener, new CanalMessageBody(CanalEventType.UPDATE, convert2, convert));
    }

    private <T extends CanalEntity> void sendMessage(CanalEventListener<T> canalEventListener, CanalMessageBody canalMessageBody) {
        try {
            Message message = new Message();
            message.setTopic(this.topic);
            message.setTags(canalEventListener.listening());
            message.setBody(JSONObject.toJSONBytes(canalMessageBody, new SerializerFeature[0]));
            SendResult send = this.producer.send(message);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("message [{}] send successful", send.getMsgId());
            }
        } catch (SendException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("send:", e);
            }
            throw new RuntimeException("send:", e);
        }
    }
}
