/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.dstream;

import java.io.EOFException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.spark.internal.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import scala.Function0;
import scala.Serializable;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001A4Q!\u0001\u0002\u0001\t1\u0011!CU1x\u001d\u0016$xo\u001c:l%\u0016\u001cW-\u001b<fe*\u00111\u0001B\u0001\bIN$(/Z1n\u0015\t)a!A\u0005tiJ,\u0017-\\5oO*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\ta!\u00199bG\",'\"A\u0006\u0002\u0007=\u0014xmE\u0002\u0001\u001be\u00012AD\t\u0014\u001b\u0005y!B\u0001\t\u0005\u0003!\u0011XmY3jm\u0016\u0014\u0018B\u0001\n\u0010\u0005!\u0011VmY3jm\u0016\u0014\bC\u0001\u000b\u0018\u001b\u0005)\"\"\u0001\f\u0002\u000bM\u001c\u0017\r\\1\n\u0005a)\"aA!osB\u0011!$H\u0007\u00027)\u0011ADB\u0001\tS:$XM\u001d8bY&\u0011ad\u0007\u0002\b\u0019><w-\u001b8h\u0011!\u0001\u0003A!A!\u0002\u0013\u0011\u0013\u0001\u00025pgR\u001c\u0001\u0001\u0005\u0002$U9\u0011A\u0005\u000b\t\u0003KUi\u0011A\n\u0006\u0003O\u0005\na\u0001\u0010:p_Rt\u0014BA\u0015\u0016\u0003\u0019\u0001&/\u001a3fM&\u00111\u0006\f\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005%*\u0002\u0002\u0003\u0018\u0001\u0005\u0003\u0005\u000b\u0011B\u0018\u0002\tA|'\u000f\u001e\t\u0003)AJ!!M\u000b\u0003\u0007%sG\u000fC\u00054\u0001\t\u0005\t\u0015!\u00035u\u0005a1\u000f^8sC\u001e,G*\u001a<fYB\u0011Q\u0007O\u0007\u0002m)\u0011qGB\u0001\bgR|'/Y4f\u0013\tIdG\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G.\u0003\u00024#!)A\b\u0001C\u0001{\u00051A(\u001b8jiz\"BA\u0010!B\u0005B\u0011q\bA\u0007\u0002\u0005!)\u0001e\u000fa\u0001E!)af\u000fa\u0001_!)1g\u000fa\u0001i!9A\t\u0001a\u0001\n\u0003)\u0015A\u00052m_\u000e\\\u0007+^:iS:<G\u000b\u001b:fC\u0012,\u0012A\u0012\t\u0003\u000f2k\u0011\u0001\u0013\u0006\u0003\u0013*\u000bA\u0001\\1oO*\t1*\u0001\u0003kCZ\f\u0017BA'I\u0005\u0019!\u0006N]3bI\"9q\n\u0001a\u0001\n\u0003\u0001\u0016A\u00062m_\u000e\\\u0007+^:iS:<G\u000b\u001b:fC\u0012|F%Z9\u0015\u0005E#\u0006C\u0001\u000bS\u0013\t\u0019VC\u0001\u0003V]&$\bbB+O\u0003\u0003\u0005\rAR\u0001\u0004q\u0012\n\u0004BB,\u0001A\u0003&a)A\ncY>\u001c7\u000eU;tQ&tw\r\u00165sK\u0006$\u0007\u0005C\u0003Z\u0001\u0011\u0005!,A\u0004p]N#\u0018M\u001d;\u0015\u0003ECQ\u0001\u0018\u0001\u0005\u0002i\u000baa\u001c8Ti>\u0004\b\"\u00020\u0001\t\u0013y\u0016!\u0003:fC\u00124U\u000f\u001c7z)\r\t\u0006M\u001b\u0005\u0006Cv\u0003\rAY\u0001\bG\"\fgN\\3m!\t\u0019\u0007.D\u0001e\u0015\t)g-\u0001\u0005dQ\u0006tg.\u001a7t\u0015\t9'*A\u0002oS>L!!\u001b3\u0003'I+\u0017\rZ1cY\u0016\u0014\u0015\u0010^3DQ\u0006tg.\u001a7\t\u000b-l\u0006\u0019\u00017\u0002\t\u0011,7\u000f\u001e\t\u0003[:l\u0011AZ\u0005\u0003_\u001a\u0014!BQ=uK\n+hMZ3s\u0001")
public class RawNetworkReceiver
extends Receiver<Object>
implements Logging {
    private final String host;
    private final int port;
    private Thread blockPushingThread;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public Thread blockPushingThread() {
        return this.blockPushingThread;
    }

    public void blockPushingThread_$eq(Thread x$1) {
        this.blockPushingThread = x$1;
    }

    @Override
    public void onStart() {
        this.logInfo((Function0<String>)(Function0 & java.io.Serializable & Serializable)() -> new StringBuilder(15).append("Connecting to ").append($this.host).append(":").append($this.port).toString());
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(true);
        channel.connect(new InetSocketAddress(this.host, this.port));
        this.logInfo((Function0<String>)(Function0 & java.io.Serializable & Serializable)() -> new StringBuilder(14).append("Connected to ").append($this.host).append(":").append($this.port).toString());
        ArrayBlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(2);
        this.blockPushingThread_$eq(new Thread(this, queue){
            private final /* synthetic */ RawNetworkReceiver $outer;
            private final ArrayBlockingQueue queue$1;

            public void run() {
                int nextBlockNumber = 0;
                while (true) {
                    ByteBuffer buffer = (ByteBuffer)this.queue$1.take();
                    ++nextBlockNumber;
                    this.$outer.store(buffer);
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.queue$1 = queue$1;
                this.setDaemon(true);
            }
        });
        this.blockPushingThread().start();
        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
        while (true) {
            lengthBuffer.clear();
            this.readFully(channel, lengthBuffer);
            lengthBuffer.flip();
            int length = lengthBuffer.getInt();
            ByteBuffer dataBuffer = ByteBuffer.allocate(length);
            this.readFully(channel, dataBuffer);
            dataBuffer.flip();
            this.logInfo((Function0<String>)(Function0 & java.io.Serializable & Serializable)() -> new StringBuilder(24).append("Read a block with ").append(length).append(" bytes").toString());
            queue.put(dataBuffer);
        }
    }

    @Override
    public void onStop() {
        block0: {
            if (this.blockPushingThread() == null) break block0;
            this.blockPushingThread().interrupt();
        }
    }

    private void readFully(ReadableByteChannel channel, ByteBuffer dest) {
        while (dest.position() < dest.limit()) {
            if (channel.read(dest) != -1) continue;
            throw new EOFException("End of channel");
        }
    }

    public RawNetworkReceiver(String host, int port, StorageLevel storageLevel) {
        this.host = host;
        this.port = port;
        super(storageLevel);
        Logging.$init$((Logging)this);
        this.blockPushingThread = null;
    }
}

