/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.lovelink.cm.mq;

import com.bxm.lovelink.cm.data.Binlog;
import com.bxm.lovelink.cm.data.TableHandlerFactory;
import com.bxm.lovelink.cm.mq.CanalMqFetcher;
import com.bxm.warcar.mq.ConsumeStatus;
import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.SingleMessageListener;
import com.bxm.warcar.utils.JsonHelper;
import com.bxm.warcar.utils.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CanalMessageListener
implements SingleMessageListener {
    private static final Logger log = LoggerFactory.getLogger(CanalMessageListener.class);
    private final String canalTopic;
    private final String canalConsumerId;
    private final TableHandlerFactory tableHandlerFactory;

    public CanalMessageListener(CanalMqFetcher canalMqFetcher, TableHandlerFactory tableHandlerFactory) {
        this.canalTopic = canalMqFetcher.getCanalTopic();
        this.canalConsumerId = canalMqFetcher.getCanalConsumerId();
        this.tableHandlerFactory = tableHandlerFactory;
    }

    public ConsumeStatus consume(Message message, Object context) {
        try {
            byte[] body = message.getBody();
            String json = StringHelper.convert((byte[])body);
            this.tableHandlerFactory.handle((Binlog)JsonHelper.convert((String)json, Binlog.class), json);
            return ConsumeStatus.CONSUME_SUCCESS;
        }
        catch (Exception e) {
            log.error("CONSUME ERROR", (Throwable)e);
            return ConsumeStatus.RECONSUME_LATER;
        }
    }

    public String getTopic() {
        return this.canalTopic;
    }

    public String getConsumerId() {
        return this.canalConsumerId;
    }
}

