/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.warcar.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.bxm.warcar.canal.CanalEntity;
import com.bxm.warcar.canal.CanalEventListener;
import com.bxm.warcar.canal.utils.EntityHelper;
import com.bxm.warcar.utils.LifeCycle;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CanalClient
extends LifeCycle {
    private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);
    public static final String FILTER_DEFAULT = ".*\\..*";
    public static final int BATCH_SIZE = 1000;
    private final Map<String, CanalEventListener<? extends CanalEntity>> listeners = Maps.newConcurrentMap();
    private final String zkServers;
    private final String destination;
    private final String username;
    private final String password;
    private String filter;
    private int batchSize = 1000;
    private CanalConnector connector;

    public CanalClient(String zkServers, String destination) {
        this(zkServers, destination, null, null);
    }

    public CanalClient(String zkServers, String destination, String username, String password) {
        this.zkServers = zkServers;
        this.destination = destination;
        this.username = username;
        this.password = password;
    }

    protected void doInit() {
        this.connector = CanalConnectors.newClusterConnector((String)this.zkServers, (String)this.destination, (String)this.username, (String)this.password);
        try {
            this.connector.connect();
            this.connector.subscribe(this.filter);
            this.connector.rollback();
            while (true) {
                boolean empty;
                Message message = this.connector.getWithoutAck(this.batchSize);
                long messageId = message.getId();
                List entries = message.getEntries();
                int size = entries.size();
                boolean bl = empty = messageId == -1L || size == 0;
                if (!empty) {
                    try {
                        this.process(entries);
                        this.connector.ack(messageId);
                    }
                    catch (Exception e) {
                        if (LOGGER.isErrorEnabled()) {
                            LOGGER.error("process:", (Throwable)e);
                        }
                        this.connector.rollback(messageId);
                    }
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    if (!LOGGER.isErrorEnabled()) continue;
                    LOGGER.error("sleep:", (Throwable)e);
                }
            }
        }
        catch (CanalClientException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("doInit:", (Throwable)e);
            }
            return;
        }
    }

    protected void doDestroy() {
        this.connector.disconnect();
    }

    private void process(List<CanalEntry.Entry> entries) {
        if (CollectionUtils.isEmpty(entries)) {
            return;
        }
        for (CanalEntry.Entry entry : entries) {
            CanalEntry.EntryType entryType = entry.getEntryType();
            if (entryType == CanalEntry.EntryType.TRANSACTIONBEGIN || entryType == CanalEntry.EntryType.TRANSACTIONEND) continue;
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
            }
            catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            String listening = schemaName + "." + tableName;
            CanalEventListener<? extends CanalEntity> listener = this.listeners.get(listening);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Process message: binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), schemaName, tableName, eventType));
            }
            if (null == listener) {
                if (!LOGGER.isDebugEnabled()) continue;
                LOGGER.debug("No canal event listener of {}.", (Object)listening);
                continue;
            }
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    this.doDelete(rowData.getBeforeColumnsList(), listener);
                    continue;
                }
                if (eventType == CanalEntry.EventType.INSERT) {
                    this.doInsert(rowData.getAfterColumnsList(), listener);
                    continue;
                }
                this.doUpdate(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), listener);
            }
        }
    }

    protected <T extends CanalEntity> void doDelete(List<CanalEntry.Column> columns, CanalEventListener<T> listener) {
        T o = this.convert(columns, listener);
        listener.doDelete(o);
    }

    protected <T extends CanalEntity> void doInsert(List<CanalEntry.Column> columns, CanalEventListener<T> listener) {
        T o = this.convert(columns, listener);
        listener.doInsert(o);
    }

    protected <T extends CanalEntity> void doUpdate(List<CanalEntry.Column> before, List<CanalEntry.Column> after, CanalEventListener<T> listener) {
        T beforeEntity = this.convert(before, listener);
        T afterEntity = this.convert(after, listener);
        listener.doUpdate(beforeEntity, afterEntity);
    }

    protected <T extends CanalEntity> T convert(List<CanalEntry.Column> columns, CanalEventListener<T> listener) {
        return (T)((CanalEntity)EntityHelper.consutract(columns, listener.getEntityClass()));
    }

    public void addListener(CanalEventListener<? extends CanalEntity> listener) {
        if (null == listener) {
            return;
        }
        String listening = listener.listening();
        if (StringUtils.isBlank((String)listening)) {
            throw new IllegalArgumentException("listening cannot be blank");
        }
        if (this.listeners.containsKey(listening) && LOGGER.isWarnEnabled()) {
            LOGGER.warn("Repeated listener [{}] already existed.", (Object)listening);
        }
        this.listeners.put(listening, listener);
    }

    public void addListeners(Map<String, CanalEventListener<? extends CanalEntity>> listeners) {
        if (null == listeners) {
            return;
        }
        this.listeners.putAll(listeners);
    }

    public String getZkServers() {
        return this.zkServers;
    }

    public String getDestination() {
        return this.destination;
    }

    public String getUsername() {
        return this.username;
    }

    public String getPassword() {
        return this.password;
    }

    public String getFilter() {
        return this.filter;
    }

    public void setFilter(String filter) {
        this.filter = filter;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }
}

