/*
 * Decompiled with CFR 0.152.
 */
package com.redislabs.provider.redis.streaming;

import com.redislabs.provider.redis.ReadWriteConfig;
import com.redislabs.provider.redis.RedisConfig;
import com.redislabs.provider.redis.streaming.ConsumerConfig;
import com.redislabs.provider.redis.streaming.Earliest$;
import com.redislabs.provider.redis.streaming.IdOffset;
import com.redislabs.provider.redis.streaming.ItemId;
import com.redislabs.provider.redis.streaming.Latest$;
import com.redislabs.provider.redis.streaming.Offset;
import com.redislabs.provider.redis.streaming.StreamItem;
import com.redislabs.provider.redis.util.Logging;
import com.redislabs.provider.redis.util.Logging$class;
import com.redislabs.provider.redis.util.PipelineUtils$;
import com.redislabs.provider.redis.util.StreamUtils$;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.curator.utils.ThreadUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.spark_project.guava.util.concurrent.RateLimiter;
import redis.clients.jedis.EntryID;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.StreamEntry;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005Ud\u0001B\u0001\u0003\u00015\u00111CU3eSN\u001cFO]3b[J+7-Z5wKJT!a\u0001\u0003\u0002\u0013M$(/Z1nS:<'BA\u0003\u0007\u0003\u0015\u0011X\rZ5t\u0015\t9\u0001\"\u0001\u0005qe>4\u0018\u000eZ3s\u0015\tI!\"A\u0005sK\u0012L7\u000f\\1cg*\t1\"A\u0002d_6\u001c\u0001aE\u0002\u0001\u001d}\u00012aD\r\u001c\u001b\u0005\u0001\"BA\t\u0013\u0003!\u0011XmY3jm\u0016\u0014(BA\u0002\u0014\u0015\t!R#A\u0003ta\u0006\u00148N\u0003\u0002\u0017/\u00051\u0011\r]1dQ\u0016T\u0011\u0001G\u0001\u0004_J<\u0017B\u0001\u000e\u0011\u0005!\u0011VmY3jm\u0016\u0014\bC\u0001\u000f\u001e\u001b\u0005\u0011\u0011B\u0001\u0010\u0003\u0005)\u0019FO]3b[&#X-\u001c\t\u0003A\rj\u0011!\t\u0006\u0003E\u0011\tA!\u001e;jY&\u0011A%\t\u0002\b\u0019><w-\u001b8h\u0011!1\u0003A!A!\u0002\u00139\u0013aD2p]N,X.\u001a:t\u0007>tg-[4\u0011\u0007!\u0012TG\u0004\u0002*_9\u0011!&L\u0007\u0002W)\u0011A\u0006D\u0001\u0007yI|w\u000e\u001e \n\u00039\nQa]2bY\u0006L!\u0001M\u0019\u0002\u000fA\f7m[1hK*\ta&\u0003\u00024i\t\u00191+Z9\u000b\u0005A\n\u0004C\u0001\u000f7\u0013\t9$A\u0001\bD_:\u001cX/\\3s\u0007>tg-[4\t\u0011e\u0002!\u0011!Q\u0001\ni\n1B]3eSN\u001cuN\u001c4jOB\u00111\bP\u0007\u0002\t%\u0011Q\b\u0002\u0002\f%\u0016$\u0017n]\"p]\u001aLw\r\u0003\u0005@\u0001\t\u0005\t\u0015!\u0003A\u0003=\u0011X-\u00193Xe&$XmQ8oM&<\u0007CA\u001eB\u0013\t\u0011EAA\bSK\u0006$wK]5uK\u000e{gNZ5h\u0011%!\u0005A!A!\u0002\u0013)5*\u0001\u0007ti>\u0014\u0018mZ3MKZ,G\u000e\u0005\u0002G\u00136\tqI\u0003\u0002I'\u000591\u000f^8sC\u001e,\u0017B\u0001&H\u00051\u0019Fo\u001c:bO\u0016dUM^3m\u0013\t!\u0015\u0004C\u0003N\u0001\u0011\u0005a*\u0001\u0004=S:LGO\u0010\u000b\u0006\u001fB\u000b&k\u0015\t\u00039\u0001AQA\n'A\u0002\u001dBQ!\u000f'A\u0002iBQa\u0010'A\u0002\u0001CQ\u0001\u0012'A\u0002\u0015CQ!\u0016\u0001\u0005BY\u000bqa\u001c8Ti\u0006\u0014H\u000fF\u0001X!\tA\u0016,D\u00012\u0013\tQ\u0016G\u0001\u0003V]&$\b\"\u0002/\u0001\t\u00032\u0016AB8o'R|\u0007O\u0002\u0003_\u0001\u0011y&AD'fgN\fw-\u001a%b]\u0012dWM]\n\u0004;\u0002D\u0007CA1g\u001b\u0005\u0011'BA2e\u0003\u0011a\u0017M\\4\u000b\u0003\u0015\fAA[1wC&\u0011qM\u0019\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005\u0005L\u0017B\u00016c\u0005!\u0011VO\u001c8bE2,\u0007\u0002\u00037^\u0005\u0003\u0005\u000b\u0011B\u001b\u0002\t\r|gN\u001a\u0005\tsu\u0013\t\u0011)A\u0005u!Aq(\u0018BC\u0002\u0013\rq.F\u0001A\u0011!\tXL!A!\u0002\u0013\u0001\u0015\u0001\u0005:fC\u0012<&/\u001b;f\u0007>tg-[4!\u0011\u0015iU\f\"\u0001t)\u0011!ho\u001e=\u0011\u0005UlV\"\u0001\u0001\t\u000b1\u0014\b\u0019A\u001b\t\u000be\u0012\b\u0019\u0001\u001e\t\u000b}\u0012\b9\u0001!\t\u000fil&\u0019!C\u0001w\u0006)!.\u001a3jgV\tA\u0010E\u0002~\u0003\u000bi\u0011A \u0006\u0003u~TA!!\u0001\u0002\u0004\u000591\r\\5f]R\u001c(\"A\u0003\n\u0007\u0005\u001daPA\u0003KK\u0012L7\u000fC\u0004\u0002\fu\u0003\u000b\u0011\u0002?\u0002\r),G-[:!\u0011%\ty!\u0018b\u0001\n\u0003\t\t\"\u0001\bsCR,G*[7ji\u0016\u0014x\n\u001d;\u0016\u0005\u0005M\u0001#\u0002-\u0002\u0016\u0005e\u0011bAA\fc\t1q\n\u001d;j_:\u0004B!a\u0007\u0002,5\u0011\u0011Q\u0004\u0006\u0005\u0003?\t\t#\u0001\u0006d_:\u001cWO\u001d:f]RT1AIA\u0012\u0015\u0011\t)#a\n\u0002\u000b\u001d,\u0018M^1\u000b\u0007\u0005%r#A\u0007ta\u0006\u00148n\u00189s_*,7\r^\u0005\u0005\u0003[\tiBA\u0006SCR,G*[7ji\u0016\u0014\b\u0002CA\u0019;\u0002\u0006I!a\u0005\u0002\u001fI\fG/\u001a'j[&$XM](qi\u0002Ba!!\u000e^\t\u00032\u0016a\u0001:v]\"1\u0011\u0011H/\u0005\u0002Y\u000bQd\u0019:fCR,7i\u001c8tk6,'o\u0012:pkBLeMT8u\u000bbL7\u000f\u001e\u0005\u0007\u0003{iF\u0011\u0001,\u0002+I,7-Z5wKVs\u0017mY6o_^dW\rZ4fI\"1\u0011\u0011I/\u0005\u0002Y\u000b!C]3dK&4XMT3x\u001b\u0016\u001c8/Y4fg\"9\u0011QI/\u0005\u0002\u0005\u001d\u0013aC:u_J,\u0017I\u001c3BG.$RaVA%\u00037B\u0001\"a\u0013\u0002D\u0001\u0007\u0011QJ\u0001\ngR\u0014X-Y7LKf\u0004B!a\u0014\u0002V9\u0019\u0001,!\u0015\n\u0007\u0005M\u0013'\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003/\nIF\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003'\n\u0004\u0002CA/\u0003\u0007\u0002\r!a\u0018\u0002\u000f\u0015tGO]5fgB!\u0001FMA1!\ri\u00181M\u0005\u0004\u0003Kr(aC*ue\u0016\fW.\u00128uefDq!!\u001b^\t\u0003\tY'\u0001\bf]R\u0014\u0018.Z:U_&#X-\\:\u0015\r\u00055\u0014qNA:!\rA#g\u0007\u0005\t\u0003c\n9\u00071\u0001\u0002N\u0005\u00191.Z=\t\u0011\u0005u\u0013q\ra\u0001\u0003?\u0002")
public class RedisStreamReceiver
extends Receiver<StreamItem>
implements Logging {
    private final Seq<ConsumerConfig> consumersConfig;
    public final RedisConfig com$redislabs$provider$redis$streaming$RedisStreamReceiver$$redisConfig;
    public final ReadWriteConfig com$redislabs$provider$redis$streaming$RedisStreamReceiver$$readWriteConfig;
    private transient Logger com$redislabs$provider$redis$util$Logging$$_logger;

    @Override
    public Logger com$redislabs$provider$redis$util$Logging$$_logger() {
        return this.com$redislabs$provider$redis$util$Logging$$_logger;
    }

    @Override
    public void com$redislabs$provider$redis$util$Logging$$_logger_$eq(Logger x$1) {
        this.com$redislabs$provider$redis$util$Logging$$_logger = x$1;
    }

    @Override
    public String loggerName() {
        return Logging$class.loggerName(this);
    }

    @Override
    public Logger logger() {
        return Logging$class.logger(this);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging$class.logInfo(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging$class.logDebug(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging$class.logTrace(this, msg);
    }

    /*
     * WARNING - void declaration
     */
    public void onStart() {
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Starting Redis Stream Receiver";
            }
        });
        ExecutorService executorPool = ThreadUtils.newFixedThreadPool((int)this.consumersConfig.size(), (String)"RedisStreamMessageHandler");
        try {
            this.consumersConfig.foreach((Function1)new Serializable(this, executorPool){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RedisStreamReceiver $outer;
                private final ExecutorService executorPool$1;

                public final Future<?> apply(ConsumerConfig c) {
                    return this.executorPool$1.submit(new MessageHandler(this.$outer, c, this.$outer.com$redislabs$provider$redis$streaming$RedisStreamReceiver$$redisConfig, this.$outer.com$redislabs$provider$redis$streaming$RedisStreamReceiver$$readWriteConfig));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.executorPool$1 = executorPool$1;
                }
            });
            executorPool.shutdown();
            return;
        }
        catch (Throwable throwable) {
            void var1_1;
            var1_1.shutdown();
            throw throwable;
        }
    }

    public void onStop() {
    }

    public RedisStreamReceiver(Seq<ConsumerConfig> consumersConfig, RedisConfig redisConfig, ReadWriteConfig readWriteConfig, StorageLevel storageLevel) {
        this.consumersConfig = consumersConfig;
        this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$$redisConfig = redisConfig;
        this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$$readWriteConfig = readWriteConfig;
        super(storageLevel);
        Logging$class.$init$(this);
    }

    public class MessageHandler
    implements Runnable {
        public final ConsumerConfig com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf;
        private final ReadWriteConfig readWriteConfig;
        private final Jedis jedis;
        private final Option<RateLimiter> rateLimiterOpt;
        public final /* synthetic */ RedisStreamReceiver $outer;

        public ReadWriteConfig readWriteConfig() {
            return this.readWriteConfig;
        }

        public Jedis jedis() {
            return this.jedis;
        }

        public Option<RateLimiter> rateLimiterOpt() {
            return this.rateLimiterOpt;
        }

        @Override
        public void run() {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().logInfo((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ MessageHandler $outer;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting MessageHandler ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            });
            try {
                this.createConsumerGroupIfNotExist();
                this.receiveUnacknowledged();
                this.receiveNewMessages();
            }
            catch (Exception exception) {
                this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().restart("Error handling message. Restarting.", exception);
            }
        }

        public void createConsumerGroupIfNotExist() {
            Offset offset;
            block5: {
                EntryID entryID;
                block3: {
                    block4: {
                        block2: {
                            offset = this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.offset();
                            if (!Earliest$.MODULE$.equals(offset)) break block2;
                            entryID = new EntryID(0L, 0L);
                            break block3;
                        }
                        if (!Latest$.MODULE$.equals(offset)) break block4;
                        entryID = EntryID.LAST_ENTRY;
                        break block3;
                    }
                    if (!(offset instanceof IdOffset)) break block5;
                    IdOffset idOffset = (IdOffset)offset;
                    long v1 = idOffset.v1();
                    long v2 = idOffset.v2();
                    entryID = new EntryID(v1, v2);
                }
                EntryID entryId = entryID;
                StreamUtils$.MODULE$.createConsumerGroupIfNotExist(this.jedis(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.groupName(), entryId);
                return;
            }
            throw new MatchError((Object)offset);
        }

        public void receiveUnacknowledged() {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().logInfo((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ MessageHandler $outer;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting receiving unacknowledged messages for key ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey()}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            });
            boolean bl = true;
            AbstractMap.SimpleEntry<String, EntryID> unackId = new AbstractMap.SimpleEntry<String, EntryID>(this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey(), new EntryID(0L, 0L));
            while (!this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().isStopped() && bl) {
                List response = this.jedis().xreadGroup(this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.groupName(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.consumerName(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.batchSize(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.block(), false, new Map.Entry[]{unackId});
                Map unackMessagesMap = ((TraversableOnce)JavaConversions$.MODULE$.asScalaBuffer(response).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<String, List<StreamEntry>> apply(Map.Entry<String, List<StreamEntry>> e) {
                        return new Tuple2((Object)e.getKey(), e.getValue());
                    }
                }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
                List entries = (List)unackMessagesMap.apply((Object)this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey());
                if (entries.isEmpty()) {
                    bl = false;
                }
                this.storeAndAck(this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey(), (Seq<StreamEntry>)JavaConversions$.MODULE$.asScalaBuffer(entries));
            }
        }

        public void receiveNewMessages() {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().logInfo((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ MessageHandler $outer;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Starting receiving new messages for key ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey()}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            });
            AbstractMap.SimpleEntry<String, EntryID> newMessId = new AbstractMap.SimpleEntry<String, EntryID>(this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.streamKey(), EntryID.UNRECEIVED_ENTRY);
            while (!this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().isStopped()) {
                List response = this.jedis().xreadGroup(this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.groupName(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.consumerName(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.batchSize(), this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.block(), false, new Map.Entry[]{newMessId});
                JavaConversions$.MODULE$.asScalaBuffer(response).foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ MessageHandler $outer;

                    public final void apply(Map.Entry<String, List<StreamEntry>> streamMessages) {
                        String key = streamMessages.getKey();
                        List<StreamEntry> entries = streamMessages.getValue();
                        this.$outer.storeAndAck(key, (Seq<StreamEntry>)JavaConversions$.MODULE$.asScalaBuffer(entries));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }
            return;
        }

        public void storeAndAck(String streamKey, Seq<StreamEntry> entries) {
            if (entries.nonEmpty()) {
                this.rateLimiterOpt().foreach((Function1)new Serializable(this, entries){
                    public static final long serialVersionUID = 0L;
                    private final Seq entries$1;

                    public final void apply(RateLimiter x$1) {
                        x$1.acquire(this.entries$1.size());
                    }
                    {
                        this.entries$1 = entries$1;
                    }
                });
                Seq<StreamItem> streamItems = this.entriesToItems(streamKey, entries);
                this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer().store(streamItems.iterator());
                PipelineUtils$.MODULE$.foreachWithPipeline(this.jedis(), entries, new Serializable(this, streamKey){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ MessageHandler $outer;
                    private final String streamKey$1;

                    public final void apply(Pipeline pipeline, StreamEntry entry) {
                        pipeline.xack(this.streamKey$1, this.$outer.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf.groupName(), new EntryID[]{entry.getID()});
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.streamKey$1 = streamKey$1;
                    }
                }, this.readWriteConfig());
            }
        }

        public Seq<StreamItem> entriesToItems(String key, Seq<StreamEntry> entries) {
            return (Seq)entries.map((Function1)new Serializable(this, key){
                public static final long serialVersionUID = 0L;
                private final String key$1;

                public final StreamItem apply(StreamEntry e) {
                    ItemId itemId = new ItemId(e.getID().getTime(), e.getID().getSequence());
                    return new StreamItem(this.key$1, itemId, (Map<String, String>)JavaConversions$.MODULE$.mapAsScalaMap(e.getFields()).toMap(Predef$.MODULE$.$conforms()));
                }
                {
                    this.key$1 = key$1;
                }
            }, Seq$.MODULE$.canBuildFrom());
        }

        public /* synthetic */ RedisStreamReceiver com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public MessageHandler(RedisStreamReceiver $outer, ConsumerConfig conf, RedisConfig redisConfig, ReadWriteConfig readWriteConfig) {
            this.com$redislabs$provider$redis$streaming$RedisStreamReceiver$MessageHandler$$conf = conf;
            this.readWriteConfig = readWriteConfig;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            this.jedis = redisConfig.connectionForKey(conf.streamKey());
            this.rateLimiterOpt = conf.rateLimitPerConsumer().map((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final RateLimiter apply(int r) {
                    return RateLimiter.create((double)r);
                }
            });
        }
    }
}

