/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.io.FileInputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.AbstractRpcClient;
import org.apache.flume.api.HostInfo;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThriftRpcClient
extends AbstractRpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRpcClient.class);
    public static final String CONFIG_PROTOCOL = "protocol";
    public static final String BINARY_PROTOCOL = "binary";
    public static final String COMPACT_PROTOCOL = "compact";
    private int batchSize;
    private long requestTimeout;
    private final Lock stateLock;
    private State connState;
    private String hostname;
    private int port;
    private ConnectionPoolManager connectionManager;
    private final ExecutorService callTimeoutPool;
    private final AtomicLong threadCounter;
    private final Random random = new Random();
    private String protocol;
    private boolean enableSsl;
    private String truststore;
    private String truststorePassword;
    private String truststoreType;
    private final List<String> excludeProtocols = new LinkedList<String>();

    public ThriftRpcClient() {
        this.stateLock = new ReentrantLock(true);
        this.connState = State.INIT;
        this.threadCounter = new AtomicLong(0L);
        this.callTimeoutPool = Executors.newCachedThreadPool(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("Flume Thrift RPC thread - " + String.valueOf(ThriftRpcClient.this.threadCounter.incrementAndGet()));
                return t;
            }
        });
    }

    @Override
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public void append(Event event) throws EventDeliveryException {
        ClientWrapper client = null;
        boolean destroyedClient = false;
        try {
            if (!this.isActive()) {
                throw new EventDeliveryException("Client was closed due to error. Please create a new client");
            }
            client = this.connectionManager.checkout();
            ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(event.getHeaders(), ByteBuffer.wrap(event.getBody()));
            this.doAppend(client, thriftEvent).get(this.requestTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            if (e instanceof ExecutionException) {
                Throwable cause = e.getCause();
                if (cause instanceof EventDeliveryException) {
                    throw (EventDeliveryException)cause;
                }
                if (cause instanceof TimeoutException) {
                    throw new EventDeliveryException("Append call timeout", cause);
                }
            }
            destroyedClient = true;
            if (client != null) {
                this.connectionManager.destroy(client);
            }
            if (e instanceof Error) {
                throw (Error)e;
            }
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new EventDeliveryException("Failed to send event. ", e);
        }
        finally {
            if (client != null && !destroyedClient) {
                this.connectionManager.checkIn(client);
            }
        }
    }

    @Override
    public void appendBatch(List<Event> events) throws EventDeliveryException {
        ClientWrapper client = null;
        boolean destroyedClient = false;
        try {
            if (!this.isActive()) {
                throw new EventDeliveryException("Client was closed due to error or is not yet configured.");
            }
            client = this.connectionManager.checkout();
            ArrayList<ThriftFlumeEvent> thriftFlumeEvents = new ArrayList<ThriftFlumeEvent>();
            Iterator<Event> eventsIter = events.iterator();
            while (eventsIter.hasNext()) {
                thriftFlumeEvents.clear();
                for (int i = 0; i < this.batchSize && eventsIter.hasNext(); ++i) {
                    Event event = eventsIter.next();
                    thriftFlumeEvents.add(new ThriftFlumeEvent(event.getHeaders(), ByteBuffer.wrap(event.getBody())));
                }
                if (thriftFlumeEvents.isEmpty()) continue;
                this.doAppendBatch(client, thriftFlumeEvents).get(this.requestTimeout, TimeUnit.MILLISECONDS);
            }
        }
        catch (Throwable e) {
            if (e instanceof ExecutionException) {
                Throwable cause = e.getCause();
                if (cause instanceof EventDeliveryException) {
                    throw (EventDeliveryException)cause;
                }
                if (cause instanceof TimeoutException) {
                    throw new EventDeliveryException("Append call timeout", cause);
                }
            }
            destroyedClient = true;
            if (client != null) {
                this.connectionManager.destroy(client);
            }
            if (e instanceof Error) {
                throw (Error)e;
            }
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new EventDeliveryException("Failed to send event. ", e);
        }
        finally {
            if (client != null && !destroyedClient) {
                this.connectionManager.checkIn(client);
            }
        }
    }

    private Future<Void> doAppend(final ClientWrapper client, final ThriftFlumeEvent e) throws Exception {
        return this.callTimeoutPool.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                Status status = client.client.append(e);
                if (status != Status.OK) {
                    throw new EventDeliveryException("Failed to deliver events. Server returned status : " + status.name());
                }
                return null;
            }
        });
    }

    private Future<Void> doAppendBatch(final ClientWrapper client, final List<ThriftFlumeEvent> e) throws Exception {
        return this.callTimeoutPool.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                Status status = client.client.appendBatch(e);
                if (status != Status.OK) {
                    throw new EventDeliveryException("Failed to deliver events. Server returned status : " + status.name());
                }
                return null;
            }
        });
    }

    @Override
    public boolean isActive() {
        this.stateLock.lock();
        try {
            boolean bl = this.connState == State.READY;
            return bl;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    @Override
    public void close() throws FlumeException {
        try {
            this.stateLock.lock();
            this.connState = State.DEAD;
            this.connectionManager.closeAll();
            this.callTimeoutPool.shutdown();
            if (!this.callTimeoutPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.callTimeoutPool.shutdownNow();
            }
        }
        catch (Throwable ex) {
            if (ex instanceof Error) {
                throw (Error)ex;
            }
            if (ex instanceof RuntimeException) {
                throw (RuntimeException)ex;
            }
            throw new FlumeException("Failed to close RPC client. ", ex);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    @Override
    protected void configure(Properties properties) throws FlumeException {
        if (this.isActive()) {
            throw new FlumeException("Attempting to re-configured an already configured client!");
        }
        this.stateLock.lock();
        try {
            int connectionPoolSize;
            HostInfo host = HostInfo.getHostInfoList(properties).get(0);
            this.hostname = host.getHostName();
            this.port = host.getPortNumber();
            this.protocol = properties.getProperty(CONFIG_PROTOCOL);
            if (this.protocol == null) {
                this.protocol = COMPACT_PROTOCOL;
            }
            if (!this.protocol.equalsIgnoreCase(BINARY_PROTOCOL) && !this.protocol.equalsIgnoreCase(COMPACT_PROTOCOL)) {
                LOGGER.warn("'binary' or 'compact' are the only valid Thrift protocol types to choose from. Defaulting to 'compact'.");
                this.protocol = COMPACT_PROTOCOL;
            }
            this.batchSize = Integer.parseInt(properties.getProperty("batch-size", RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
            this.requestTimeout = Long.parseLong(properties.getProperty("request-timeout", String.valueOf(RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS)));
            if (this.requestTimeout < 1000L) {
                LOGGER.warn("Request timeout specified less than 1s. Using default value instead.");
                this.requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
            }
            if ((connectionPoolSize = Integer.parseInt(properties.getProperty("maxConnections", String.valueOf(5)))) < 1) {
                LOGGER.warn("Connection Pool Size specified is less than 1. Using default value instead.");
                connectionPoolSize = 5;
            }
            this.enableSsl = Boolean.parseBoolean(properties.getProperty("ssl"));
            if (this.enableSsl) {
                this.truststore = properties.getProperty("truststore");
                this.truststorePassword = properties.getProperty("truststore-password");
                this.truststoreType = properties.getProperty("truststore-type", "JKS");
                String excludeProtocolsStr = properties.getProperty("exclude-protocols");
                if (excludeProtocolsStr == null) {
                    this.excludeProtocols.add("SSLv3");
                } else {
                    this.excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
                    if (!this.excludeProtocols.contains("SSLv3")) {
                        this.excludeProtocols.add("SSLv3");
                    }
                }
            }
            this.connectionManager = new ConnectionPoolManager(connectionPoolSize);
            this.connState = State.READY;
        }
        catch (Throwable ex) {
            this.connState = State.DEAD;
            if (ex instanceof Error) {
                throw (Error)ex;
            }
            if (ex instanceof RuntimeException) {
                throw (RuntimeException)ex;
            }
            throw new FlumeException("Error while configuring RpcClient. ", ex);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    protected TTransport getTransport(TSocket tsocket) throws Exception {
        return new TFastFramedTransport((TTransport)tsocket);
    }

    private static SSLContext createSSLContext(String truststore, String truststorePassword, String truststoreType) throws FlumeException {
        SSLContext ctx;
        try {
            ctx = SSLContext.getInstance("TLS");
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore ts = null;
            if (truststore != null && truststoreType != null) {
                ts = KeyStore.getInstance(truststoreType);
                ts.load(new FileInputStream(truststore), truststorePassword.toCharArray());
                tmf.init(ts);
            }
            tmf.init(ts);
            ctx.init(null, tmf.getTrustManagers(), null);
        }
        catch (Exception e) {
            throw new FlumeException("Error creating the transport", e);
        }
        return ctx;
    }

    private static TSocket createSSLSocket(SSLSocketFactory factory, String host, int port, int timeout, List<String> excludeProtocols) throws FlumeException {
        try {
            SSLSocket socket = (SSLSocket)factory.createSocket(host, port);
            socket.setSoTimeout(timeout);
            ArrayList<String> enabledProtocols = new ArrayList<String>();
            for (String protocol : socket.getEnabledProtocols()) {
                if (excludeProtocols.contains(protocol)) continue;
                enabledProtocols.add(protocol);
            }
            socket.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
            return new TSocket((Socket)socket);
        }
        catch (Exception e) {
            throw new FlumeException("Could not connect to " + host + " on port " + port, e);
        }
    }

    private class ConnectionPoolManager {
        private final Queue<ClientWrapper> availableClients;
        private final Set<ClientWrapper> checkedOutClients;
        private final int maxPoolSize;
        private int currentPoolSize;
        private final Lock poolLock;
        private final Condition availableClientsCondition;

        public ConnectionPoolManager(int poolSize) {
            this.maxPoolSize = poolSize;
            this.availableClients = new LinkedList<ClientWrapper>();
            this.checkedOutClients = new HashSet<ClientWrapper>();
            this.poolLock = new ReentrantLock();
            this.availableClientsCondition = this.poolLock.newCondition();
            this.currentPoolSize = 0;
        }

        public ClientWrapper checkout() throws Exception {
            ClientWrapper ret = null;
            this.poolLock.lock();
            try {
                if (this.availableClients.isEmpty() && this.currentPoolSize < this.maxPoolSize) {
                    ret = new ClientWrapper();
                    ++this.currentPoolSize;
                    this.checkedOutClients.add(ret);
                    ClientWrapper clientWrapper = ret;
                    return clientWrapper;
                }
                while (this.availableClients.isEmpty()) {
                    this.availableClientsCondition.await();
                }
                ret = this.availableClients.poll();
                this.checkedOutClients.add(ret);
            }
            finally {
                this.poolLock.unlock();
            }
            return ret;
        }

        public void checkIn(ClientWrapper client) {
            this.poolLock.lock();
            try {
                this.availableClients.add(client);
                this.checkedOutClients.remove(client);
                this.availableClientsCondition.signal();
            }
            finally {
                this.poolLock.unlock();
            }
        }

        public void destroy(ClientWrapper client) {
            this.poolLock.lock();
            try {
                this.checkedOutClients.remove(client);
                --this.currentPoolSize;
            }
            finally {
                this.poolLock.unlock();
            }
            client.transport.close();
        }

        public void closeAll() {
            this.poolLock.lock();
            try {
                for (ClientWrapper c : this.availableClients) {
                    c.transport.close();
                    --this.currentPoolSize;
                }
                for (ClientWrapper c : this.checkedOutClients) {
                    c.transport.close();
                    --this.currentPoolSize;
                }
            }
            finally {
                this.poolLock.unlock();
            }
        }
    }

    private class ClientWrapper {
        public final ThriftSourceProtocol.Client client;
        public final TTransport transport;
        private final int hashCode;

        public ClientWrapper() throws Exception {
            TSocket tsocket;
            if (ThriftRpcClient.this.enableSsl) {
                SSLContext sslContext = ThriftRpcClient.createSSLContext(ThriftRpcClient.this.truststore, ThriftRpcClient.this.truststorePassword, ThriftRpcClient.this.truststoreType);
                SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
                tsocket = ThriftRpcClient.createSSLSocket(sslSockFactory, ThriftRpcClient.this.hostname, ThriftRpcClient.this.port, 120000, ThriftRpcClient.this.excludeProtocols);
            } else {
                tsocket = new TSocket(ThriftRpcClient.this.hostname, ThriftRpcClient.this.port);
            }
            this.transport = ThriftRpcClient.this.getTransport(tsocket);
            if (!this.transport.isOpen()) {
                this.transport.open();
            }
            if (ThriftRpcClient.this.protocol.equals(ThriftRpcClient.BINARY_PROTOCOL)) {
                LOGGER.info("Using TBinaryProtocol");
                this.client = new ThriftSourceProtocol.Client((TProtocol)new TBinaryProtocol(this.transport));
            } else {
                LOGGER.info("Using TCompactProtocol");
                this.client = new ThriftSourceProtocol.Client((TProtocol)new TCompactProtocol(this.transport));
            }
            this.hashCode = ThriftRpcClient.this.random.nextInt();
        }

        public boolean equals(Object o) {
            if (o == null) {
                return false;
            }
            return this == o;
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    private static enum State {
        INIT,
        READY,
        DEAD;

    }
}

