/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.warcar.canal.mq;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.bxm.warcar.canal.CanalClient;
import com.bxm.warcar.canal.CanalEntity;
import com.bxm.warcar.canal.CanalEventListener;
import com.bxm.warcar.canal.CanalEventType;
import com.bxm.warcar.canal.mq.CanalMessageBody;
import com.bxm.warcar.canal.mq.CanalMessageListenerDispatcher;
import com.bxm.warcar.mq.Consumer;
import com.bxm.warcar.mq.Listener;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.Producer;
import com.bxm.warcar.mq.SendException;
import com.bxm.warcar.mq.SendResult;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CanalMessageQueueClient
extends CanalClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(CanalMessageQueueClient.class);
    private final Producer producer;
    private final Consumer consumer;
    private final String topic;

    public CanalMessageQueueClient(Producer producer, Consumer consumer, String topic, String zkServers, String destination) {
        this(producer, consumer, topic, zkServers, destination, null, null);
    }

    public CanalMessageQueueClient(Producer producer, Consumer consumer, String topic, String zkServers, String destination, String username, String password) {
        super(zkServers, destination, username, password);
        Listener messageListener;
        this.producer = producer;
        this.consumer = consumer;
        this.topic = topic;
        if (!producer.isStarted()) {
            producer.start();
        }
        if (!consumer.isStarted()) {
            consumer.start();
        }
        if (!((messageListener = consumer.getMessageListener()) instanceof CanalMessageListenerDispatcher)) {
            throw new RuntimeException("This message listener must be instance CanalMessageListenerDispatcher for the consumer.");
        }
        super.addListeners(((CanalMessageListenerDispatcher)messageListener).getListeners());
    }

    @Override
    public void addListener(CanalEventListener<? extends CanalEntity> listener) {
        throw new UnsupportedOperationException("Please use com.bxm.warcar.canal.mq.CanalMessageListenerDispatcher#(String, CanalEventListener<? extends CanalEntity>)");
    }

    @Override
    protected <T extends CanalEntity> void doDelete(List<CanalEntry.Column> columns, CanalEventListener<T> listener) {
        T object = this.convert(columns, listener);
        if (null == object) {
            return;
        }
        CanalMessageBody entity = new CanalMessageBody(CanalEventType.DELETE, object, null);
        this.sendMessage(listener, entity);
    }

    @Override
    protected <T extends CanalEntity> void doInsert(List<CanalEntry.Column> columns, CanalEventListener<T> listener) {
        T object = this.convert(columns, listener);
        if (null == object) {
            return;
        }
        CanalMessageBody entity = new CanalMessageBody(CanalEventType.INSERT, object, null);
        this.sendMessage(listener, entity);
    }

    @Override
    protected <T extends CanalEntity> void doUpdate(List<CanalEntry.Column> before, List<CanalEntry.Column> after, CanalEventListener<T> listener) {
        T beforeObj = this.convert(before, listener);
        if (null == beforeObj) {
            return;
        }
        T afterObj = this.convert(after, listener);
        if (null == afterObj) {
            return;
        }
        CanalMessageBody entity = new CanalMessageBody(CanalEventType.UPDATE, beforeObj, afterObj);
        this.sendMessage(listener, entity);
    }

    private <T extends CanalEntity> void sendMessage(CanalEventListener<T> listener, CanalMessageBody entity) {
        try {
            Message message = new Message();
            message.setTopic(this.topic);
            message.setTags(listener.listening());
            message.setBody(JSONObject.toJSONBytes((Object)entity, (SerializerFeature[])new SerializerFeature[0]));
            SendResult result = this.producer.send(message);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("message [{}] send successful", (Object)result.getMsgId());
            }
        }
        catch (SendException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("send:", (Throwable)e);
            }
            throw new RuntimeException("send:", e);
        }
    }
}

