/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.tools.ConsumerPerformance;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

public final class ConsumerPerformance$ {
    public static ConsumerPerformance$ MODULE$;
    private final Logger logger;

    static {
        new ConsumerPerformance$();
    }

    private Logger logger() {
        return this.logger;
    }

    public void main(String[] args) {
        block5: {
            ConsumerPerformance.ConsumerPerfConfig config = new ConsumerPerformance.ConsumerPerfConfig(args);
            this.logger().info((Object)"Starting consumer...");
            AtomicLong totalMessagesRead = new AtomicLong(0L);
            AtomicLong totalBytesRead = new AtomicLong(0L);
            AtomicBoolean consumerTimeout = new AtomicBoolean(false);
            if (!config.hideHeader()) {
                if (!config.showDetailedStats()) {
                    Predef$.MODULE$.println((Object)"start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec");
                } else {
                    Predef$.MODULE$.println((Object)"time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec");
                }
            }
            long startMs = 0L;
            long endMs = 0L;
            if (!config.useOldConsumer()) {
                KafkaConsumer consumer = new KafkaConsumer(config.props());
                consumer.subscribe((Collection)JavaConversions$.MODULE$.deprecated$u0020seqAsJavaList((Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{config.topic()}))));
                startMs = System.currentTimeMillis();
                this.consume((KafkaConsumer<byte[], byte[]>)consumer, (List<String>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{config.topic()})), config.numMessages(), 1000L, config, totalMessagesRead, totalBytesRead);
                endMs = System.currentTimeMillis();
                consumer.close();
            } else {
                ConsumerConfig consumerConfig = new ConsumerConfig(config.props());
                ConsumerConnector consumerConnector = Consumer$.MODULE$.create(consumerConfig);
                Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams((Map<String, Object>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)config.topic()), (Object)BoxesRunTime.boxToInteger((int)config.numThreads()))}))));
                ObjectRef threadList = ObjectRef.create((Object)Nil$.MODULE$);
                topicMessageStreams.withFilter((Function1 & Serializable & scala.Serializable)check$ifrefutable$1 -> BoxesRunTime.boxToBoolean((boolean)ConsumerPerformance$.$anonfun$main$1(check$ifrefutable$1))).foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                    ConsumerPerformance$.$anonfun$main$2(config, totalMessagesRead, totalBytesRead, consumerTimeout, threadList, x$1);
                    return BoxedUnit.UNIT;
                });
                this.logger().info((Object)"Sleeping for 1 second.");
                Thread.sleep(1000L);
                this.logger().info((Object)"starting threads");
                startMs = System.currentTimeMillis();
                ((List)threadList.elem).foreach((Function1 & Serializable & scala.Serializable)thread -> {
                    thread.start();
                    return BoxedUnit.UNIT;
                });
                ((List)threadList.elem).foreach((Function1 & Serializable & scala.Serializable)thread -> {
                    thread.join();
                    return BoxedUnit.UNIT;
                });
                endMs = consumerTimeout.get() ? System.currentTimeMillis() - (long)consumerConfig.consumerTimeoutMs() : System.currentTimeMillis();
                consumerConnector.shutdown();
            }
            double elapsedSecs = (double)(endMs - startMs) / 1000.0;
            if (config.showDetailedStats()) break block5;
            double totalMBRead = (double)totalBytesRead.get() * 1.0 / (double)0x100000;
            Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("%s, %s, %.4f, %.4f, %d, %.4f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{config.dateFormat().format(BoxesRunTime.boxToLong((long)startMs)), config.dateFormat().format(BoxesRunTime.boxToLong((long)endMs)), BoxesRunTime.boxToDouble((double)totalMBRead), BoxesRunTime.boxToDouble((double)(totalMBRead / elapsedSecs)), BoxesRunTime.boxToLong((long)totalMessagesRead.get()), BoxesRunTime.boxToDouble((double)((double)totalMessagesRead.get() / elapsedSecs))})));
        }
    }

    public void consume(KafkaConsumer<byte[], byte[]> consumer, List<String> topics, long count, long timeout, ConsumerPerformance.ConsumerPerfConfig config, AtomicLong totalMessagesRead, AtomicLong totalBytesRead) {
        LongRef bytesRead = LongRef.create((long)0L);
        LongRef messagesRead = LongRef.create((long)0L);
        LongRef lastBytesRead = LongRef.create((long)0L);
        LongRef lastMessagesRead = LongRef.create((long)0L);
        int joinTimeout = 10000;
        AtomicBoolean isAssigned = new AtomicBoolean(false);
        consumer.subscribe((Collection)JavaConversions$.MODULE$.deprecated$u0020seqAsJavaList(topics), new ConsumerRebalanceListener(isAssigned){
            private final AtomicBoolean isAssigned$1;

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                this.isAssigned$1.set(true);
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                this.isAssigned$1.set(false);
            }
            {
                this.isAssigned$1 = isAssigned$1;
            }
        });
        long joinStart = System.currentTimeMillis();
        while (!isAssigned.get()) {
            if (System.currentTimeMillis() - joinStart >= (long)joinTimeout) {
                throw new Exception("Timed out waiting for initial group join.");
            }
            consumer.poll(100L);
        }
        consumer.seekToBeginning((Collection)JavaConversions$.MODULE$.deprecated$u0020seqAsJavaList((Seq)Nil$.MODULE$));
        long startMs = System.currentTimeMillis();
        LongRef lastReportTime = LongRef.create((long)startMs);
        long lastConsumedTime = System.currentTimeMillis();
        LongRef currentTimeMillis = LongRef.create((long)lastConsumedTime);
        while (messagesRead.elem < count && currentTimeMillis.elem - lastConsumedTime <= timeout) {
            ConsumerRecords records = consumer.poll(100L);
            currentTimeMillis.elem = System.currentTimeMillis();
            if (records.count() > 0) {
                lastConsumedTime = currentTimeMillis.elem;
            }
            JavaConversions$.MODULE$.deprecated$u0020iterableAsScalaIterable((Iterable)records).foreach((Function1 & Serializable & scala.Serializable)record -> {
                ConsumerPerformance$.$anonfun$consume$1(this, config, bytesRead, messagesRead, lastBytesRead, lastMessagesRead, lastReportTime, currentTimeMillis, record);
                return BoxedUnit.UNIT;
            });
        }
        totalMessagesRead.set(messagesRead.elem);
        totalBytesRead.set(bytesRead.elem);
    }

    public void printProgressMessage(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        double elapsedMs = endMs - startMs;
        double totalMBRead = (double)bytesRead * 1.0 / (double)0x100000;
        double mbRead = (double)(bytesRead - lastBytesRead) * 1.0 / (double)0x100000;
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("%s, %d, %.4f, %.4f, %d, %.4f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dateFormat.format(BoxesRunTime.boxToLong((long)endMs)), BoxesRunTime.boxToInteger((int)id), BoxesRunTime.boxToDouble((double)totalMBRead), BoxesRunTime.boxToDouble((double)(1000.0 * (mbRead / elapsedMs))), BoxesRunTime.boxToLong((long)messagesRead), BoxesRunTime.boxToDouble((double)((double)(messagesRead - lastMessagesRead) / elapsedMs * 1000.0))})));
    }

    public static final /* synthetic */ boolean $anonfun$main$1(Tuple2 check$ifrefutable$1) {
        Tuple2 tuple2 = check$ifrefutable$1;
        boolean bl = tuple2 != null;
        return bl;
    }

    public static final /* synthetic */ void $anonfun$main$2(ConsumerPerformance.ConsumerPerfConfig config$1, AtomicLong totalMessagesRead$1, AtomicLong totalBytesRead$1, AtomicBoolean consumerTimeout$1, ObjectRef threadList$1, Tuple2 x$1) {
        Tuple2 tuple2 = x$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        List streamList = (List)tuple2._2();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), streamList.length()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            threadList$1.elem = ((List)threadList$1.elem).$colon$colon((Object)new ConsumerPerformance.ConsumerPerfThread(i, "kafka-zk-consumer-" + i, (KafkaStream)streamList.apply(i), config$1, totalMessagesRead$1, totalBytesRead$1, consumerTimeout$1));
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$consume$1(ConsumerPerformance$ $this, ConsumerPerformance.ConsumerPerfConfig config$2, LongRef bytesRead$1, LongRef messagesRead$1, LongRef lastBytesRead$1, LongRef lastMessagesRead$1, LongRef lastReportTime$1, LongRef currentTimeMillis$1, ConsumerRecord record) {
        block3: {
            ++messagesRead$1.elem;
            if (record.key() != null) {
                bytesRead$1.elem += (long)new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[])record.key())).size();
            }
            if (record.value() != null) {
                bytesRead$1.elem += (long)new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[])record.value())).size();
            }
            if (currentTimeMillis$1.elem - lastReportTime$1.elem < (long)config$2.reportingInterval()) break block3;
            if (config$2.showDetailedStats()) {
                $this.printProgressMessage(0, bytesRead$1.elem, lastBytesRead$1.elem, messagesRead$1.elem, lastMessagesRead$1.elem, lastReportTime$1.elem, currentTimeMillis$1.elem, config$2.dateFormat());
            }
            lastReportTime$1.elem = currentTimeMillis$1.elem;
            lastMessagesRead$1.elem = messagesRead$1.elem;
            lastBytesRead$1.elem = bytesRead$1.elem;
        }
    }

    private ConsumerPerformance$() {
        MODULE$ = this;
        this.logger = Logger.getLogger(this.getClass());
    }
}

