/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.replication;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.org.apache.hadoop.hbase.Abortable;
import org.apache.hudi.org.apache.hadoop.hbase.TableName;
import org.apache.hudi.org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hudi.org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hudi.org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hudi.org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hudi.org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hudi.org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hudi.org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hudi.org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hudi.org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hudi.org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hudi.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

@InterfaceAudience.Private
public class ReplicationPeerZKImpl
implements ReplicationPeer,
Abortable,
Closeable {
    private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
    private final ReplicationPeerConfig peerConfig;
    private final String id;
    private volatile ReplicationPeer.PeerState peerState;
    private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
    private final Configuration conf;
    private PeerStateTracker peerStateTracker;
    private TableCFsTracker tableCFsTracker;

    public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
        this.conf = conf;
        this.peerConfig = peerConfig;
        this.id = id;
    }

    public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, Map<TableName, List<String>> tableCFs) throws ReplicationException {
        this.conf = conf;
        this.peerConfig = peerConfig;
        this.id = id;
        this.tableCFs = tableCFs;
    }

    public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) throws KeeperException {
        ReplicationPeerZKImpl.ensurePeerEnabled(zookeeper, peerStateNode);
        this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
        this.peerStateTracker.start();
        try {
            this.readPeerStateZnode();
        }
        catch (DeserializationException e) {
            throw ZKUtil.convert(e);
        }
    }

    private void readPeerStateZnode() throws DeserializationException {
        this.peerState = ReplicationPeerZKImpl.isStateEnabled(this.peerStateTracker.getData(false)) ? ReplicationPeer.PeerState.ENABLED : ReplicationPeer.PeerState.DISABLED;
    }

    public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode) throws KeeperException {
        this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper, this);
        this.tableCFsTracker.start();
        this.readTableCFsZnode();
    }

    private void readTableCFsZnode() {
        String currentTableCFs = Bytes.toString(this.tableCFsTracker.getData(false));
        this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
    }

    @Override
    public ReplicationPeer.PeerState getPeerState() {
        return this.peerState;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public ReplicationPeerConfig getPeerConfig() {
        return this.peerConfig;
    }

    @Override
    public Configuration getConfiguration() {
        return this.conf;
    }

    @Override
    public Map<TableName, List<String>> getTableCFs() {
        return this.tableCFs;
    }

    @Override
    public void abort(String why, Throwable e) {
        LOG.fatal((Object)("The ReplicationPeer coresponding to peer " + this.peerConfig + " was aborted for the following reason(s):" + why), e);
    }

    @Override
    public boolean isAborted() {
        return false;
    }

    @Override
    public void close() throws IOException {
    }

    public static boolean isStateEnabled(byte[] bytes) throws DeserializationException {
        ZooKeeperProtos.ReplicationState.State state = ReplicationPeerZKImpl.parseStateFrom(bytes);
        return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
    }

    private static ZooKeeperProtos.ReplicationState.State parseStateFrom(byte[] bytes) throws DeserializationException {
        ProtobufUtil.expectPBMagicPrefix(bytes);
        int pblen = ProtobufUtil.lengthOfPBMagic();
        ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState.newBuilder();
        try {
            ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
            ZooKeeperProtos.ReplicationState state = builder.build();
            return state.getState();
        }
        catch (IOException e) {
            throw new DeserializationException(e);
        }
    }

    private static boolean ensurePeerEnabled(ZooKeeperWatcher zookeeper, String path) throws KeeperException.NodeExistsException, KeeperException {
        if (ZKUtil.checkExists(zookeeper, path) == -1) {
            ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
            return true;
        }
        return false;
    }

    public class TableCFsTracker
    extends ZooKeeperNodeTracker {
        public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, Abortable abortable) {
            super(watcher, tableCFsZNode, abortable);
        }

        @Override
        public synchronized void nodeCreated(String path) {
            if (path.equals(this.node)) {
                super.nodeCreated(path);
                ReplicationPeerZKImpl.this.readTableCFsZnode();
            }
        }

        @Override
        public synchronized void nodeDataChanged(String path) {
            if (path.equals(this.node)) {
                super.nodeDataChanged(path);
            }
        }
    }

    public class PeerStateTracker
    extends ZooKeeperNodeTracker {
        public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, Abortable abortable) {
            super(watcher, peerStateZNode, abortable);
        }

        @Override
        public synchronized void nodeDataChanged(String path) {
            if (path.equals(this.node)) {
                super.nodeDataChanged(path);
                try {
                    ReplicationPeerZKImpl.this.readPeerStateZnode();
                }
                catch (DeserializationException e) {
                    LOG.warn((Object)("Failed deserializing the content of " + path), (Throwable)e);
                }
            }
        }
    }
}

