/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.impl;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalNodeAccessStrategy;
import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterCanalConnector
implements CanalConnector {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private String username;
    private String password;
    private int soTimeout = 60000;
    private int idleTimeout = 3600000;
    private int retryTimes = 3;
    private int retryInterval = 5000;
    private CanalNodeAccessStrategy accessStrategy;
    private SimpleCanalConnector currentConnector;
    private String destination;
    private String filter;

    public ClusterCanalConnector(String username, String password, String destination, CanalNodeAccessStrategy accessStrategy) {
        this.username = username;
        this.password = password;
        this.destination = destination;
        this.accessStrategy = accessStrategy;
    }

    @Override
    public void connect() throws CanalClientException {
        block4: while (this.currentConnector == null) {
            int times = 0;
            while (true) {
                try {
                    this.currentConnector = new SimpleCanalConnector(null, this.username, this.password, this.destination){

                        @Override
                        public SocketAddress getNextAddress() {
                            return ClusterCanalConnector.this.accessStrategy.nextNode();
                        }
                    };
                    this.currentConnector.setSoTimeout(this.soTimeout);
                    this.currentConnector.setIdleTimeout(this.idleTimeout);
                    if (this.filter != null) {
                        this.currentConnector.setFilter(this.filter);
                    }
                    if (this.accessStrategy instanceof ClusterNodeAccessStrategy) {
                        this.currentConnector.setZkClientx(((ClusterNodeAccessStrategy)this.accessStrategy).getZkClient());
                    }
                    this.currentConnector.connect();
                    continue block4;
                }
                catch (Exception e) {
                    this.logger.warn("failed to connect to:{} after retry {} times", (Object)this.accessStrategy.currentNode(), (Object)times);
                    this.currentConnector.disconnect();
                    this.currentConnector = null;
                    if (++times >= this.retryTimes) {
                        throw new CanalClientException((Throwable)e);
                    }
                    try {
                        Thread.sleep(this.retryInterval);
                    }
                    catch (InterruptedException e1) {
                        throw new CanalClientException((Throwable)e1);
                    }
                }
            }
        }
    }

    @Override
    public boolean checkValid() {
        return this.currentConnector != null && this.currentConnector.checkValid();
    }

    @Override
    public void disconnect() throws CanalClientException {
        if (this.currentConnector != null) {
            this.currentConnector.disconnect();
            this.currentConnector = null;
        }
    }

    @Override
    public void subscribe() throws CanalClientException {
        this.subscribe("");
    }

    @Override
    public void subscribe(String filter) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                this.currentConnector.subscribe(filter);
                this.filter = filter;
                return;
            }
            catch (Throwable t) {
                if (this.retryTimes == -1 && t.getCause() instanceof InterruptedException) {
                    this.logger.info("block waiting interrupted by other thread.");
                    return;
                }
                this.logger.warn(String.format("something goes wrong when subscribing from server: %s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to subscribe after " + times + " times retry.");
    }

    @Override
    public void unsubscribe() throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                this.currentConnector.unsubscribe();
                return;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when unsubscribing from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to unsubscribe after " + times + " times retry.");
    }

    @Override
    public Message get(int batchSize) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                Message msg = this.currentConnector.get(batchSize);
                return msg;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when getting data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to fetch the data after " + times + " times retry");
    }

    @Override
    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                Message msg = this.currentConnector.get(batchSize, timeout, unit);
                return msg;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when getting data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to fetch the data after " + times + " times retry");
    }

    @Override
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                Message msg = this.currentConnector.getWithoutAck(batchSize);
                return msg;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when getWithoutAck data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to fetch the data after " + times + " times retry");
    }

    @Override
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                Message msg = this.currentConnector.getWithoutAck(batchSize, timeout, unit);
                return msg;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when getWithoutAck data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to fetch the data after " + times + " times retry");
    }

    @Override
    public void rollback(long batchId) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                this.currentConnector.rollback(batchId);
                return;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when rollbacking data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to rollback after " + times + " times retry");
    }

    @Override
    public void rollback() throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                this.currentConnector.rollback();
                return;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when rollbacking data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to rollback after " + times + " times retry");
    }

    @Override
    public void ack(long batchId) throws CanalClientException {
        int times;
        for (times = 0; times < this.retryTimes; ++times) {
            try {
                this.currentConnector.ack(batchId);
                return;
            }
            catch (Throwable t) {
                this.logger.warn(String.format("something goes wrong when acking data from server:%s", this.currentConnector != null ? this.currentConnector.getAddress() : "null"), t);
                this.restart();
                this.logger.info("restart the connector for next round retry.");
                continue;
            }
        }
        throw new CanalClientException("failed to ack after " + times + " times retry");
    }

    private void restart() throws CanalClientException {
        this.disconnect();
        try {
            Thread.sleep(this.retryInterval);
        }
        catch (InterruptedException e) {
            throw new CanalClientException((Throwable)e);
        }
        this.connect();
    }

    public String getUsername() {
        return this.username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getSoTimeout() {
        return this.soTimeout;
    }

    public void setSoTimeout(int soTimeout) {
        this.soTimeout = soTimeout;
    }

    public int getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setIdleTimeout(int idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    public int getRetryTimes() {
        return this.retryTimes;
    }

    public void setRetryTimes(int retryTimes) {
        this.retryTimes = retryTimes;
    }

    public int getRetryInterval() {
        return this.retryInterval;
    }

    public void setRetryInterval(int retryInterval) {
        this.retryInterval = retryInterval;
    }

    public CanalNodeAccessStrategy getAccessStrategy() {
        return this.accessStrategy;
    }

    public void setAccessStrategy(CanalNodeAccessStrategy accessStrategy) {
        this.accessStrategy = accessStrategy;
    }

    public SimpleCanalConnector getCurrentConnector() {
        return this.currentConnector;
    }

    @Override
    public void stopRunning() {
        this.currentConnector.stopRunning();
    }
}

