/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper.server;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper.server.NettyServerCnxn;
import org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper.server.ServerCnxn;
import org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper.server.ZooKeeperServer;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerCnxnFactory
extends ServerCnxnFactory {
    Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
    ServerBootstrap bootstrap;
    Channel parentChannel;
    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
    HashMap<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap();
    InetSocketAddress localAddress;
    int maxClientCnxns = 60;
    CnxnChannelHandler channelHandler = new CnxnChannelHandler();
    boolean killed;

    NettyServerCnxnFactory() {
        this.bootstrap = new ServerBootstrap((ChannelFactory)new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool()));
        this.bootstrap.setOption("reuseAddress", (Object)true);
        this.bootstrap.setOption("child.tcpNoDelay", (Object)true);
        this.bootstrap.setOption("child.soLinger", (Object)-1);
        this.bootstrap.getPipeline().addLast("servercnxnfactory", (ChannelHandler)this.channelHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeAll() {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("closeAll()");
        }
        NettyServerCnxn[] allCnxns = null;
        HashSet hashSet = this.cnxns;
        synchronized (hashSet) {
            allCnxns = this.cnxns.toArray(new NettyServerCnxn[this.cnxns.size()]);
        }
        for (NettyServerCnxn cnxn : allCnxns) {
            try {
                cnxn.close();
            }
            catch (Exception e) {
                this.LOG.warn("Ignoring exception closing cnxn sessionid 0x" + Long.toHexString(cnxn.getSessionId()), (Throwable)e);
            }
        }
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("allChannels size:" + this.allChannels.size() + " cnxns size:" + allCnxns.length);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeSession(long sessionId) {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("closeSession sessionid:0x" + sessionId);
        }
        NettyServerCnxn[] allCnxns = null;
        HashSet hashSet = this.cnxns;
        synchronized (hashSet) {
            allCnxns = this.cnxns.toArray(new NettyServerCnxn[this.cnxns.size()]);
        }
        for (NettyServerCnxn cnxn : allCnxns) {
            if (cnxn.getSessionId() != sessionId) continue;
            try {
                cnxn.close();
            }
            catch (Exception e) {
                this.LOG.warn("exception during session close", (Throwable)e);
            }
            break;
        }
    }

    @Override
    public void configure(InetSocketAddress addr, int maxClientCnxns) throws IOException {
        this.configureSaslLogin();
        this.localAddress = addr;
        this.maxClientCnxns = maxClientCnxns;
    }

    @Override
    public int getMaxClientCnxnsPerHost() {
        return this.maxClientCnxns;
    }

    @Override
    public void setMaxClientCnxnsPerHost(int max) {
        this.maxClientCnxns = max;
    }

    @Override
    public int getLocalPort() {
        return this.localAddress.getPort();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void join() throws InterruptedException {
        NettyServerCnxnFactory nettyServerCnxnFactory = this;
        synchronized (nettyServerCnxnFactory) {
            while (!this.killed) {
                this.wait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        this.LOG.info("shutdown called " + this.localAddress);
        if (this.login != null) {
            this.login.shutdown();
        }
        if (this.parentChannel != null) {
            this.parentChannel.close().awaitUninterruptibly();
            this.closeAll();
            this.allChannels.close().awaitUninterruptibly();
            this.bootstrap.releaseExternalResources();
        }
        if (this.zkServer != null) {
            this.zkServer.shutdown();
        }
        NettyServerCnxnFactory nettyServerCnxnFactory = this;
        synchronized (nettyServerCnxnFactory) {
            this.killed = true;
            this.notifyAll();
        }
    }

    @Override
    public void start() {
        this.LOG.info("binding to port " + this.localAddress);
        this.parentChannel = this.bootstrap.bind((SocketAddress)this.localAddress);
    }

    @Override
    public void startup(ZooKeeperServer zks) throws IOException, InterruptedException {
        this.start();
        this.setZooKeeperServer(zks);
        zks.startdata();
        zks.startup();
    }

    @Override
    public Iterable<ServerCnxn> getConnections() {
        return this.cnxns;
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        return this.localAddress;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addCnxn(NettyServerCnxn cnxn) {
        HashSet hashSet = this.cnxns;
        synchronized (hashSet) {
            this.cnxns.add(cnxn);
            HashMap<InetAddress, Set<NettyServerCnxn>> hashMap = this.ipMap;
            synchronized (hashMap) {
                InetAddress addr = ((InetSocketAddress)cnxn.channel.getRemoteAddress()).getAddress();
                Set<NettyServerCnxn> s = this.ipMap.get(addr);
                if (s == null) {
                    s = new HashSet<NettyServerCnxn>();
                }
                s.add(cnxn);
                this.ipMap.put(addr, s);
            }
        }
    }

    @ChannelHandler.Sharable
    class CnxnChannelHandler
    extends SimpleChannelHandler {
        CnxnChannelHandler() {
        }

        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                NettyServerCnxnFactory.this.LOG.trace("Channel closed " + e);
            }
            NettyServerCnxnFactory.this.allChannels.remove((Object)ctx.getChannel());
        }

        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                NettyServerCnxnFactory.this.LOG.trace("Channel connected " + e);
            }
            NettyServerCnxnFactory.this.allChannels.add((Object)ctx.getChannel());
            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(), NettyServerCnxnFactory.this.zkServer, NettyServerCnxnFactory.this);
            ctx.setAttachment((Object)cnxn);
            NettyServerCnxnFactory.this.addCnxn(cnxn);
        }

        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            NettyServerCnxn cnxn;
            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                NettyServerCnxnFactory.this.LOG.trace("Channel disconnected " + e);
            }
            if ((cnxn = (NettyServerCnxn)ctx.getAttachment()) != null) {
                if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                    NettyServerCnxnFactory.this.LOG.trace("Channel disconnect caused close " + e);
                }
                cnxn.close();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            NettyServerCnxnFactory.this.LOG.warn("Exception caught " + e, e.getCause());
            NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
            if (cnxn != null && NettyServerCnxnFactory.this.LOG.isDebugEnabled()) {
                NettyServerCnxnFactory.this.LOG.debug("Closing " + cnxn);
                cnxn.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                NettyServerCnxnFactory.this.LOG.trace("message received called " + e.getMessage());
            }
            try {
                NettyServerCnxn cnxn;
                if (NettyServerCnxnFactory.this.LOG.isDebugEnabled()) {
                    NettyServerCnxnFactory.this.LOG.debug("New message " + e.toString() + " from " + ctx.getChannel());
                }
                NettyServerCnxn nettyServerCnxn = cnxn = (NettyServerCnxn)ctx.getAttachment();
                synchronized (nettyServerCnxn) {
                    this.processMessage(e, cnxn);
                }
            }
            catch (Exception ex) {
                NettyServerCnxnFactory.this.LOG.error("Unexpected exception in receive", (Throwable)ex);
                throw ex;
            }
        }

        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
            if (NettyServerCnxnFactory.this.LOG.isDebugEnabled()) {
                NettyServerCnxnFactory.this.LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: " + cnxn.queuedBuffer);
            }
            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
                NettyServerCnxnFactory.this.LOG.debug("Received ResumeMessageEvent");
                if (cnxn.queuedBuffer != null) {
                    if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                        NettyServerCnxnFactory.this.LOG.trace("processing queue " + Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump((ChannelBuffer)cnxn.queuedBuffer));
                    }
                    cnxn.receiveMessage(cnxn.queuedBuffer);
                    if (!cnxn.queuedBuffer.readable()) {
                        NettyServerCnxnFactory.this.LOG.debug("Processed queue - no bytes remaining");
                        cnxn.queuedBuffer = null;
                    } else {
                        NettyServerCnxnFactory.this.LOG.debug("Processed queue - bytes remaining");
                    }
                } else {
                    NettyServerCnxnFactory.this.LOG.debug("queue empty");
                }
                cnxn.channel.setReadable(true);
            } else {
                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
                if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                    NettyServerCnxnFactory.this.LOG.trace(Long.toHexString(cnxn.sessionId) + " buf 0x" + ChannelBuffers.hexDump((ChannelBuffer)buf));
                }
                if (cnxn.throttled) {
                    NettyServerCnxnFactory.this.LOG.debug("Received message while throttled");
                    if (cnxn.queuedBuffer == null) {
                        NettyServerCnxnFactory.this.LOG.debug("allocating queue");
                        cnxn.queuedBuffer = ChannelBuffers.dynamicBuffer((int)buf.readableBytes());
                    }
                    cnxn.queuedBuffer.writeBytes(buf);
                    if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                        NettyServerCnxnFactory.this.LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump((ChannelBuffer)cnxn.queuedBuffer));
                    }
                } else {
                    NettyServerCnxnFactory.this.LOG.debug("not throttled");
                    if (cnxn.queuedBuffer != null) {
                        if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                            NettyServerCnxnFactory.this.LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump((ChannelBuffer)cnxn.queuedBuffer));
                        }
                        cnxn.queuedBuffer.writeBytes(buf);
                        if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                            NettyServerCnxnFactory.this.LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump((ChannelBuffer)cnxn.queuedBuffer));
                        }
                        cnxn.receiveMessage(cnxn.queuedBuffer);
                        if (!cnxn.queuedBuffer.readable()) {
                            NettyServerCnxnFactory.this.LOG.debug("Processed queue - no bytes remaining");
                            cnxn.queuedBuffer = null;
                        } else {
                            NettyServerCnxnFactory.this.LOG.debug("Processed queue - bytes remaining");
                        }
                    } else {
                        cnxn.receiveMessage(buf);
                        if (buf.readable()) {
                            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                                NettyServerCnxnFactory.this.LOG.trace("Before copy " + buf);
                            }
                            cnxn.queuedBuffer = ChannelBuffers.dynamicBuffer((int)buf.readableBytes());
                            cnxn.queuedBuffer.writeBytes(buf);
                            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                                NettyServerCnxnFactory.this.LOG.trace("Copy is " + cnxn.queuedBuffer);
                                NettyServerCnxnFactory.this.LOG.trace(Long.toHexString(cnxn.sessionId) + " queuedBuffer 0x" + ChannelBuffers.hexDump((ChannelBuffer)cnxn.queuedBuffer));
                            }
                        }
                    }
                }
            }
        }

        public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
            if (NettyServerCnxnFactory.this.LOG.isTraceEnabled()) {
                NettyServerCnxnFactory.this.LOG.trace("write complete " + e);
            }
        }
    }
}

