/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum;

import java.io.IOException;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.jute.Record;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ObserverBean;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.Request;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.Learner;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.ObserverZooKeeperServer;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.txn.TxnHeader;

public class Observer
extends Learner {
    Observer(QuorumPeer self, ObserverZooKeeperServer observerZooKeeperServer) {
        this.self = self;
        this.zk = observerZooKeeperServer;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("Observer ").append(this.sock);
        sb.append(" pendingRevalidationCount:").append(this.pendingRevalidations.size());
        return sb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void observeLeader() throws InterruptedException {
        this.zk.registerJMX(new ObserverBean(this, this.zk), this.self.jmxLocalPeerBean);
        try {
            QuorumPeer.QuorumServer leaderServer = this.findLeader();
            LOG.info("Observing " + leaderServer.addr);
            try {
                this.connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newLeaderZxid = this.registerWithLeader(16);
                this.syncWithLeader(newLeaderZxid);
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    this.readPacket(qp);
                    this.processPacket(qp);
                }
            }
            catch (Exception e) {
                LOG.warn("Exception when observing the leader", (Throwable)e);
                try {
                    this.sock.close();
                }
                catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.pendingRevalidations.clear();
            }
        }
        finally {
            this.zk.unregisterJMX(this);
        }
    }

    protected void processPacket(QuorumPacket qp) throws IOException {
        switch (qp.getType()) {
            case 5: {
                this.ping(qp);
                break;
            }
            case 2: {
                LOG.warn("Ignoring proposal");
                break;
            }
            case 4: {
                LOG.warn("Ignoring commit");
                break;
            }
            case 12: {
                LOG.error("Received an UPTODATE message after Observer started");
                break;
            }
            case 6: {
                this.revalidate(qp);
                break;
            }
            case 7: {
                ((ObserverZooKeeperServer)this.zk).sync();
                break;
            }
            case 8: {
                TxnHeader hdr = new TxnHeader();
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                Request request = new Request(null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null);
                request.txn = txn;
                request.hdr = hdr;
                ObserverZooKeeperServer obs = (ObserverZooKeeperServer)this.zk;
                obs.commitRequest(request);
            }
        }
    }

    @Override
    public void shutdown() {
        LOG.info("shutdown called", (Throwable)new Exception("shutdown Observer"));
        super.shutdown();
    }
}

