/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ClientIdAndBroker;
import kafka.common.KafkaException;
import kafka.consumer.PartitionTopicInfo$;
import kafka.message.ByteBufferMessageSet;
import kafka.server.AbstractFetcherThread$;
import kafka.server.FetcherLagStats;
import kafka.server.FetcherStats;
import kafka.server.PartitionFetchState;
import kafka.utils.CoreUtils$;
import kafka.utils.DelayedItem;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.protocol.Errors;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Set$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t%d!B\u0001\u0003\u0003\u00039!!F!cgR\u0014\u0018m\u0019;GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0006\u0003\u0007\u0011\taa]3sm\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\tQ!\u001e;jYNL!!\u0004\u0006\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\u0005\n\u001f\u0001\u0011\t\u0011)A\u0005!u\tAA\\1nKB\u0011\u0011C\u0007\b\u0003%a\u0001\"a\u0005\f\u000e\u0003QQ!!\u0006\u0004\u0002\rq\u0012xn\u001c;?\u0015\u00059\u0012!B:dC2\f\u0017BA\r\u0017\u0003\u0019\u0001&/\u001a3fM&\u00111\u0004\b\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005e1\u0012BA\b\r\u0011!y\u0002A!A!\u0002\u0013\u0001\u0012\u0001C2mS\u0016tG/\u00133\t\u0011\u0005\u0002!\u0011!Q\u0001\n\t\nAb]8ve\u000e,'I]8lKJ\u0004\"a\t\u0014\u000e\u0003\u0011R!!\n\u0003\u0002\u000f\rdWo\u001d;fe&\u0011q\u0005\n\u0002\u000f\u0005J|7.\u001a:F]\u0012\u0004v.\u001b8u\u0011!I\u0003A!A!\u0002\u0013Q\u0013A\u00044fi\u000eD')Y2l\u001f\u001a4Wj\u001d\t\u0003W1j\u0011AF\u0005\u0003[Y\u00111!\u00138u\u0011%y\u0003A!A!\u0002\u0013\u00014'A\bjg&sG/\u001a:skB$\u0018N\u00197f!\tY\u0013'\u0003\u00023-\t9!i\\8mK\u0006t\u0017BA\u0018\r\u0011\u0015)\u0004\u0001\"\u00017\u0003\u0019a\u0014N\\5u}Q1q'\u000f\u001e<yu\u0002\"\u0001\u000f\u0001\u000e\u0003\tAQa\u0004\u001bA\u0002AAQa\b\u001bA\u0002AAQ!\t\u001bA\u0002\tBq!\u000b\u001b\u0011\u0002\u0003\u0007!\u0006C\u00040iA\u0005\t\u0019\u0001\u0019\u0005\u000b}\u0002!\u0011\u0001!\u0003\u0007I+\u0015+\u0005\u0002B\tB\u00111FQ\u0005\u0003\u0007Z\u0011qAT8uQ&tw\r\u0005\u0002F!:\u0011\u0001HR\u0004\u0006\u000f\nA\t\u0001S\u0001\u0016\u0003\n\u001cHO]1di\u001a+Go\u00195feRC'/Z1e!\tA\u0014JB\u0003\u0002\u0005!\u0005!j\u0005\u0002J\u0017B\u00111\u0006T\u0005\u0003\u001bZ\u0011a!\u00118z%\u00164\u0007\"B\u001bJ\t\u0003yE#\u0001%\u0007\u000fEK\u0005\u0013aI\u0001%\naa)\u001a;dQJ+\u0017/^3tiN\u0011\u0001k\u0013\u0005\u0006)B3\t!V\u0001\bSN,U\u000e\u001d;z+\u0005\u0001\u0004\"B,Q\r\u0003A\u0016AB8gMN,G\u000f\u0006\u0002Z9B\u00111FW\u0005\u00037Z\u0011A\u0001T8oO\")QL\u0016a\u0001=\u0006qAo\u001c9jGB\u000b'\u000f^5uS>t\u0007CA0h\u001b\u0005\u0001'BA1c\u0003\u0019\u0019w.\\7p]*\u0011Qa\u0019\u0006\u0003I\u0016\fa!\u00199bG\",'\"\u00014\u0002\u0007=\u0014x-\u0003\u0002iA\nqAk\u001c9jGB\u000b'\u000f^5uS>tga\u00026J!\u0003\r\na\u001b\u0002\u000e!\u0006\u0014H/\u001b;j_:$\u0015\r^1\u0014\u0005%\\\u0005\"B7j\r\u0003q\u0017!C3se>\u00148i\u001c3f+\u0005y\u0007CA\u0016q\u0013\t\thCA\u0003TQ>\u0014H\u000fC\u0003tS\u001a\u0005A/A\u0005fq\u000e,\u0007\u000f^5p]V\tQ\u000fE\u0002,mbL!a\u001e\f\u0003\r=\u0003H/[8o!\tIhP\u0004\u0002{y:\u00111c_\u0005\u0002/%\u0011QPF\u0001\ba\u0006\u001c7.Y4f\u0013\ry\u0018\u0011\u0001\u0002\n)\"\u0014xn^1cY\u0016T!! \f\t\u000f\u0005\u0015\u0011N\"\u0001\u0002\b\u00051Bo\u001c\"zi\u0016\u0014UO\u001a4fe6+7o]1hKN+G/\u0006\u0002\u0002\nA!\u00111BA\t\u001b\t\tiAC\u0002\u0002\u0010\u0011\tq!\\3tg\u0006<W-\u0003\u0003\u0002\u0014\u00055!\u0001\u0006\"zi\u0016\u0014UO\u001a4fe6+7o]1hKN+G\u000fC\u0004\u0002\u0018%4\t!!\u0007\u0002\u001b!Lw\r[,bi\u0016\u0014X.\u0019:l+\u0005I\u0006\"CA\u000f\u0013F\u0005I\u0011AA\u0010\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%iU\u0011\u0011\u0011\u0005\u0016\u0004U\u0005\r2FAA\u0013!\u0011\t9#!\r\u000e\u0005\u0005%\"\u0002BA\u0016\u0003[\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005=b#\u0001\u0006b]:|G/\u0019;j_:LA!a\r\u0002*\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\t\u0013\u0005]\u0012*%A\u0005\u0002\u0005e\u0012a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$S'\u0006\u0002\u0002<)\u001a\u0001'a\t\u0005\u000f\u0005}\u0002A!\u0001\u0002B\t\u0011\u0001\u000bR\t\u0004\u0003\u0006\r\u0003CA#j\u0011%\t9\u0005\u0001b\u0001\n\u0013\tI%A\bqCJ$\u0018\u000e^5p]N#\u0018\r^3t+\t\tY\u0005\u0005\u0004\u0002N\u0005M\u0013qK\u0007\u0003\u0003\u001fR1!!\u0015a\u0003%Ig\u000e^3s]\u0006d7/\u0003\u0003\u0002V\u0005=#a\u0004)beRLG/[8o'R\fG/Z:\u0011\u0007a\nI&C\u0002\u0002\\\t\u00111\u0003U1si&$\u0018n\u001c8GKR\u001c\u0007n\u0015;bi\u0016D\u0001\"a\u0018\u0001A\u0003%\u00111J\u0001\u0011a\u0006\u0014H/\u001b;j_:\u001cF/\u0019;fg\u0002B\u0011\"a\u0019\u0001\u0005\u0004%I!!\u001a\u0002!A\f'\u000f^5uS>tW*\u00199M_\u000e\\WCAA4!\u0011\tI'a\u001f\u000e\u0005\u0005-$\u0002BA7\u0003_\nQ\u0001\\8dWNTA!!\u001d\u0002t\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\t\u0005U\u0014qO\u0001\u0005kRLGN\u0003\u0002\u0002z\u0005!!.\u0019<b\u0013\u0011\ti(a\u001b\u0003\u001bI+WM\u001c;sC:$Hj\\2l\u0011!\t\t\t\u0001Q\u0001\n\u0005\u001d\u0014!\u00059beRLG/[8o\u001b\u0006\u0004Hj\\2lA!I\u0011Q\u0011\u0001C\u0002\u0013%\u0011qQ\u0001\u0011a\u0006\u0014H/\u001b;j_:l\u0015\r]\"p]\u0012,\"!!#\u0011\t\u0005%\u00141R\u0005\u0005\u0003\u001b\u000bYGA\u0005D_:$\u0017\u000e^5p]\"A\u0011\u0011\u0013\u0001!\u0002\u0013\tI)A\tqCJ$\u0018\u000e^5p]6\u000b\u0007oQ8oI\u0002B\u0011\"!&\u0001\u0005\u0004%I!a&\u0002\u00115,GO]5d\u0013\u0012,\"!!'\u0011\t\u0005m\u0015qT\u0007\u0003\u0003;S!!\u0019\u0003\n\t\u0005\u0005\u0016Q\u0014\u0002\u0012\u00072LWM\u001c;JI\u0006sGM\u0011:pW\u0016\u0014\b\u0002CAS\u0001\u0001\u0006I!!'\u0002\u00135,GO]5d\u0013\u0012\u0004\u0003\"CAU\u0001\t\u0007I\u0011AAV\u000311W\r^2iKJ\u001cF/\u0019;t+\t\ti\u000bE\u00029\u0003_K1!!-\u0003\u000511U\r^2iKJ\u001cF/\u0019;t\u0011!\t)\f\u0001Q\u0001\n\u00055\u0016!\u00044fi\u000eDWM]*uCR\u001c\b\u0005C\u0005\u0002:\u0002\u0011\r\u0011\"\u0001\u0002<\u0006ya-\u001a;dQ\u0016\u0014H*Y4Ti\u0006$8/\u0006\u0002\u0002>B\u0019\u0001(a0\n\u0007\u0005\u0005'AA\bGKR\u001c\u0007.\u001a:MC\u001e\u001cF/\u0019;t\u0011!\t)\r\u0001Q\u0001\n\u0005u\u0016\u0001\u00054fi\u000eDWM\u001d'bON#\u0018\r^:!\u0011\u001d\tI\r\u0001D\u0001\u0003\u0017\fA\u0003\u001d:pG\u0016\u001c8\u000fU1si&$\u0018n\u001c8ECR\fG\u0003CAg\u0003'\f).!7\u0011\u0007-\ny-C\u0002\u0002RZ\u0011A!\u00168ji\"1Q,a2A\u0002yCq!a6\u0002H\u0002\u0007\u0011,A\u0006gKR\u001c\u0007n\u00144gg\u0016$\b\u0002CAn\u0003\u000f\u0004\r!!8\u0002\u001bA\f'\u000f^5uS>tG)\u0019;b!\u0011\ty.!\u0010\u000e\u0003\u0001Aq!a9\u0001\r\u0003\t)/\u0001\fiC:$G.Z(gMN,GoT;u\u001f\u001a\u0014\u0016M\\4f)\rI\u0016q\u001d\u0005\u0007;\u0006\u0005\b\u0019\u00010\t\u000f\u0005-\bA\"\u0001\u0002n\u0006Q\u0002.\u00198eY\u0016\u0004\u0016M\u001d;ji&|gn],ji\",%O]8sgR!\u0011QZAx\u0011!\t\t0!;A\u0002\u0005M\u0018A\u00039beRLG/[8ogB!\u00110!>_\u0013\u0011\t90!\u0001\u0003\u0011%#XM]1cY\u0016Dq!a?\u0001\r#\ti0A\tck&dGMR3uG\"\u0014V-];fgR$B!a@\u0003\u0002A\u0019\u0011q\u001c \t\u0011\t\r\u0011\u0011 a\u0001\u0005\u000b\tA\u0002]1si&$\u0018n\u001c8NCB\u0004R!\u001fB\u0004\u0005\u0017IAA!\u0003\u0002\u0002\t\u00191+Z9\u0011\r-\u0012iAXA,\u0013\r\u0011yA\u0006\u0002\u0007)V\u0004H.\u001a\u001a\t\u000f\tM\u0001A\"\u0005\u0003\u0016\u0005)a-\u001a;dQR!!q\u0003B\u000e!\u0015I(q\u0001B\r!\u0019Y#Q\u00020\u0002^\"A!Q\u0004B\t\u0001\u0004\ty0\u0001\u0007gKR\u001c\u0007NU3rk\u0016\u001cH\u000fC\u0004\u0003\"\u0001!\tEa\t\u0002\u0011MDW\u000f\u001e3po:$\"!!4\t\u000f\t\u001d\u0002\u0001\"\u0011\u0003$\u00051Am\\,pe.DqAa\u000b\u0001\t\u0013\u0011i#A\nqe>\u001cWm]:GKR\u001c\u0007NU3rk\u0016\u001cH\u000f\u0006\u0003\u0002N\n=\u0002\u0002\u0003B\u000f\u0005S\u0001\r!a@\t\u000f\tM\u0002\u0001\"\u0001\u00036\u0005i\u0011\r\u001a3QCJ$\u0018\u000e^5p]N$B!!4\u00038!A!\u0011\bB\u0019\u0001\u0004\u0011Y$A\nqCJ$\u0018\u000e^5p]\u0006sGm\u00144gg\u0016$8\u000f\u0005\u0004\u0003>\t\rc,W\u0007\u0003\u0005\u007fQ1A!\u0011\u0017\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0005\u000b\u0012yDA\u0002NCBDqA!\u0013\u0001\t\u0003\u0011Y%A\beK2\f\u0017\u0010U1si&$\u0018n\u001c8t)\u0019\tiM!\u0014\u0003P!A\u0011\u0011\u001fB$\u0001\u0004\t\u0019\u0010C\u0004\u0003R\t\u001d\u0003\u0019A-\u0002\u000b\u0011,G.Y=\t\u000f\tU\u0003\u0001\"\u0001\u0003X\u0005\u0001\"/Z7pm\u0016\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0005\u0003\u001b\u0014I\u0006\u0003\u0005\u0003\\\tM\u0003\u0019\u0001B/\u0003=!x\u000e]5d!\u0006\u0014H/\u001b;j_:\u001c\b#\u0002B\u001f\u0005?r\u0016\u0002\u0002B1\u0005\u007f\u00111aU3u\u0011\u001d\u0011)\u0007\u0001C\u0001\u0005O\na\u0002]1si&$\u0018n\u001c8D_VtG\u000fF\u0001+\u0001")
public abstract class AbstractFetcherThread
extends ShutdownableThread {
    private final BrokerEndPoint sourceBroker;
    private final int fetchBackOffMs;
    private final PartitionStates<PartitionFetchState> partitionStates;
    private final ReentrantLock partitionMapLock;
    private final Condition partitionMapCond;
    private final ClientIdAndBroker metricId;
    private final FetcherStats fetcherStats;
    private final FetcherLagStats fetcherLagStats;

    public static boolean $lessinit$greater$default$5() {
        return AbstractFetcherThread$.MODULE$.$lessinit$greater$default$5();
    }

    public static int $lessinit$greater$default$4() {
        return AbstractFetcherThread$.MODULE$.$lessinit$greater$default$4();
    }

    private PartitionStates<PartitionFetchState> partitionStates() {
        return this.partitionStates;
    }

    private ReentrantLock partitionMapLock() {
        return this.partitionMapLock;
    }

    private Condition partitionMapCond() {
        return this.partitionMapCond;
    }

    private ClientIdAndBroker metricId() {
        return this.metricId;
    }

    public FetcherStats fetcherStats() {
        return this.fetcherStats;
    }

    public FetcherLagStats fetcherLagStats() {
        return this.fetcherLagStats;
    }

    public abstract void processPartitionData(TopicPartition var1, long var2, PartitionData var4);

    public abstract long handleOffsetOutOfRange(TopicPartition var1);

    public abstract void handlePartitionsWithErrors(Iterable<TopicPartition> var1);

    public abstract FetchRequest buildFetchRequest(Seq<Tuple2<TopicPartition, PartitionFetchState>> var1);

    public abstract Seq<Tuple2<TopicPartition, PartitionData>> fetch(FetchRequest var1);

    @Override
    public void shutdown() {
        this.initiateShutdown();
        CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.partitionMapCond().signalAll());
        this.awaitShutdown();
        this.fetcherStats().unregister();
        this.fetcherLagStats().unregister();
    }

    @Override
    public void doWork() {
        block0: {
            FetchRequest fetchRequest = (FetchRequest)CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (Function0 & Serializable & scala.Serializable)() -> {
                void var1_1;
                Object object;
                FetchRequest fetchRequest = this.buildFetchRequest((Seq<Tuple2<TopicPartition, PartitionFetchState>>)((Seq)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(this.partitionStates().partitionStates()).asScala()).map((Function1 & Serializable & scala.Serializable)state -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)state.topicPartition()), state.value()), Buffer$.MODULE$.canBuildFrom())));
                if (fetchRequest.isEmpty()) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("There are no active partitions. Back off for %d ms before sending a fetch request")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.fetchBackOffMs)})));
                    object = BoxesRunTime.boxToBoolean((boolean)this.partitionMapCond().await($this.fetchBackOffMs, TimeUnit.MILLISECONDS));
                } else {
                    object = BoxedUnit.UNIT;
                }
                return var1_1;
            });
            if (fetchRequest.isEmpty()) break block0;
            this.processFetchRequest(fetchRequest);
        }
    }

    private void processFetchRequest(FetchRequest fetchRequest) {
        block3: {
            Object object;
            scala.collection.mutable.Set partitionsWithError = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
            ObjectRef responseData = ObjectRef.create((Object)((Seq)Seq$.MODULE$.empty()));
            try {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Issuing to broker %d of fetch request %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), fetchRequest})));
                responseData.elem = this.fetch(fetchRequest);
                object = BoxedUnit.UNIT;
            }
            catch (Throwable t) {
                if (this.isRunning().get()) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error in fetch ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{fetchRequest})), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> t);
                    object = CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                        ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(this.partitionStates().partitionSet()).asScala()).foreach((Function1 & Serializable & scala.Serializable)partition -> {
                            this.updatePartitionsWithError$1(partition, partitionsWithError);
                            return BoxedUnit.UNIT;
                        });
                        return this.partitionMapCond().await($this.fetchBackOffMs, TimeUnit.MILLISECONDS);
                    });
                }
                object = BoxedUnit.UNIT;
            }
            this.fetcherStats().requestRate().mark();
            Object object2 = ((Seq)responseData.elem).nonEmpty() ? CoreUtils$.MODULE$.inLock(this.partitionMapLock(), (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> ((Seq)responseData$1.elem).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                AbstractFetcherThread.$anonfun$processFetchRequest$7(this, fetchRequest, partitionsWithError, x0$1);
                return BoxedUnit.UNIT;
            })) : BoxedUnit.UNIT;
            if (!partitionsWithError.nonEmpty()) break block3;
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("handling partitions with error for %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{partitionsWithError})));
            this.handlePartitionsWithErrors((Iterable<TopicPartition>)partitionsWithError);
        }
    }

    public void addPartitions(Map<TopicPartition, Object> partitionAndOffsets) {
        this.partitionMapLock().lockInterruptibly();
        try {
            Map newPartitionToState = (Map)((TraversableLike)partitionAndOffsets.filter((Function1 & Serializable & scala.Serializable)x0$2 -> BoxesRunTime.boxToBoolean((boolean)AbstractFetcherThread.$anonfun$addPartitions$1(this, x0$2)))).map((Function1 & Serializable & scala.Serializable)x0$3 -> {
                Tuple2 tuple2 = x0$3;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                TopicPartition tp = (TopicPartition)tuple2._1();
                long offset = tuple2._2$mcJ$sp();
                PartitionFetchState fetchState = PartitionTopicInfo$.MODULE$.isOffsetInvalid(offset) ? new PartitionFetchState(this.handleOffsetOutOfRange(tp)) : new PartitionFetchState(offset);
                Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)fetchState);
                return tuple22;
            }, Map$.MODULE$.canBuildFrom());
            scala.collection.immutable.Map existingPartitionToState = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(this.partitionStates().partitionStates()).asScala()).map((Function1 & Serializable & scala.Serializable)state -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)state.topicPartition()), state.value()), Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            this.partitionStates().set((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)existingPartitionToState.$plus$plus((GenTraversableOnce)newPartitionToState)).asJava());
            this.partitionMapCond().signalAll();
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public void delayPartitions(Iterable<TopicPartition> partitions, long delay) {
        this.partitionMapLock().lockInterruptibly();
        try {
            partitions.foreach((Function1 & Serializable & scala.Serializable)partition -> {
                AbstractFetcherThread.$anonfun$delayPartitions$1(this, delay, partition);
                return BoxedUnit.UNIT;
            });
            this.partitionMapCond().signalAll();
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public void removePartitions(Set<TopicPartition> topicPartitions) {
        this.partitionMapLock().lockInterruptibly();
        try {
            topicPartitions.foreach((Function1 & Serializable & scala.Serializable)topicPartition -> {
                AbstractFetcherThread.$anonfun$removePartitions$1(this, topicPartition);
                return BoxedUnit.UNIT;
            });
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public int partitionCount() {
        int n;
        this.partitionMapLock().lockInterruptibly();
        try {
            n = this.partitionStates().size();
        }
        finally {
            this.partitionMapLock().unlock();
        }
        return n;
    }

    private final void updatePartitionsWithError$1(TopicPartition partition, scala.collection.mutable.Set partitionsWithError$1) {
        partitionsWithError$1.$plus$eq((Object)partition);
        this.partitionStates().moveToEnd(partition);
    }

    public static final /* synthetic */ void $anonfun$processFetchRequest$8(AbstractFetcherThread $this, FetchRequest fetchRequest$1, scala.collection.mutable.Set partitionsWithError$1, TopicPartition topicPartition$1, PartitionData partitionData$1, String topic$1, int partitionId$1, PartitionFetchState currentPartitionFetchState) {
        block13: {
            if (fetchRequest$1.offset(topicPartition$1) != currentPartitionFetchState.offset()) break block13;
            Errors errors = Errors.forCode((short)partitionData$1.errorCode());
            if (Errors.NONE.equals(errors)) {
                BoxedUnit boxedUnit;
                try {
                    ByteBufferMessageSet messages = partitionData$1.toByteBufferMessageSet();
                    long newOffset = BoxesRunTime.unboxToLong((Object)messages.shallowIterator().toSeq().lastOption().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1.nextOffset())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> currentPartitionFetchState.offset()));
                    $this.fetcherLagStats().getAndMaybePut(topic$1, partitionId$1).lag_$eq(Math.max(0L, partitionData$1.highWatermark() - newOffset));
                    $this.processPartitionData(topicPartition$1, currentPartitionFetchState.offset(), partitionData$1);
                    int validBytes = messages.validBytes();
                    if (validBytes > 0) {
                        $this.partitionStates().updateAndMoveToEnd(topicPartition$1, (Object)new PartitionFetchState(newOffset));
                        $this.fetcherStats().byteRate().mark((long)validBytes);
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                }
                catch (CorruptRecordException ime) {
                    $this.logger().error((Object)("Found invalid messages during fetch for partition [" + topic$1 + "," + partitionId$1 + "] offset " + currentPartitionFetchState.offset() + " error " + ime.getMessage()));
                    $this.updatePartitionsWithError$1(topicPartition$1, partitionsWithError$1);
                    boxedUnit = BoxedUnit.UNIT;
                }
                catch (Throwable e) {
                    throw new KafkaException(new StringOps(Predef$.MODULE$.augmentString("error processing data for partition [%s,%d] offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToLong((long)currentPartitionFetchState.offset())})), e);
                }
                BoxedUnit boxedUnit2 = boxedUnit;
            } else if (Errors.OFFSET_OUT_OF_RANGE.equals(errors)) {
                BoxedUnit boxedUnit;
                try {
                    long newOffset = $this.handleOffsetOutOfRange(topicPartition$1);
                    $this.partitionStates().updateAndMoveToEnd(topicPartition$1, (Object)new PartitionFetchState(newOffset));
                    $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Current offset %d for partition [%s,%d] out of range; reset offset to %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)currentPartitionFetchState.offset()), topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToLong((long)newOffset)})));
                    boxedUnit = BoxedUnit.UNIT;
                }
                catch (Throwable e) {
                    $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Error getting offset for partition [%s,%d] to broker %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToInteger((int)$this.sourceBroker.id())})), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                    $this.updatePartitionsWithError$1(topicPartition$1, partitionsWithError$1);
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit3 = boxedUnit;
            } else {
                BoxedUnit boxedUnit;
                if ($this.isRunning().get()) {
                    $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Error for partition [%s,%d] to broker %d:%s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$1, BoxesRunTime.boxToInteger((int)partitionId$1), BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), partitionData$1.exception().get()})));
                    $this.updatePartitionsWithError$1(topicPartition$1, partitionsWithError$1);
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit4 = boxedUnit;
            }
        }
    }

    public static final /* synthetic */ void $anonfun$processFetchRequest$7(AbstractFetcherThread $this, FetchRequest fetchRequest$1, scala.collection.mutable.Set partitionsWithError$1, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition topicPartition = (TopicPartition)tuple2._1();
        PartitionData partitionData = (PartitionData)tuple2._2();
        String topic = topicPartition.topic();
        int partitionId = topicPartition.partition();
        Option$.MODULE$.apply($this.partitionStates().stateValue(topicPartition)).foreach((Function1 & Serializable & scala.Serializable)currentPartitionFetchState -> {
            AbstractFetcherThread.$anonfun$processFetchRequest$8($this, fetchRequest$1, partitionsWithError$1, topicPartition, partitionData, topic, partitionId, currentPartitionFetchState);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$addPartitions$1(AbstractFetcherThread $this, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition tp = (TopicPartition)tuple2._1();
        boolean bl = !$this.partitionStates().contains(tp);
        return bl;
    }

    public static final /* synthetic */ void $anonfun$delayPartitions$2(AbstractFetcherThread $this, long delay$1, TopicPartition partition$1, PartitionFetchState currentPartitionFetchState) {
        block0: {
            if (!currentPartitionFetchState.isActive()) break block0;
            $this.partitionStates().updateAndMoveToEnd(partition$1, (Object)new PartitionFetchState(currentPartitionFetchState.offset(), new DelayedItem(delay$1)));
        }
    }

    public static final /* synthetic */ void $anonfun$delayPartitions$1(AbstractFetcherThread $this, long delay$1, TopicPartition partition) {
        Option$.MODULE$.apply($this.partitionStates().stateValue(partition)).foreach((Function1 & Serializable & scala.Serializable)currentPartitionFetchState -> {
            AbstractFetcherThread.$anonfun$delayPartitions$2($this, delay$1, partition, currentPartitionFetchState);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$removePartitions$1(AbstractFetcherThread $this, TopicPartition topicPartition) {
        $this.partitionStates().remove(topicPartition);
        $this.fetcherLagStats().unregister(topicPartition.topic(), topicPartition.partition());
    }

    public AbstractFetcherThread(String name, String clientId, BrokerEndPoint sourceBroker, int fetchBackOffMs, boolean isInterruptible) {
        this.sourceBroker = sourceBroker;
        this.fetchBackOffMs = fetchBackOffMs;
        super(name, isInterruptible);
        this.partitionStates = new PartitionStates();
        this.partitionMapLock = new ReentrantLock();
        this.partitionMapCond = this.partitionMapLock().newCondition();
        this.metricId = new ClientIdAndBroker(clientId, sourceBroker.host(), sourceBroker.port());
        this.fetcherStats = new FetcherStats(this.metricId());
        this.fetcherLagStats = new FetcherLagStats(this.metricId());
    }

    public static interface FetchRequest {
        public boolean isEmpty();

        public long offset(TopicPartition var1);
    }

    public static interface PartitionData {
        public short errorCode();

        public Option<Throwable> exception();

        public ByteBufferMessageSet toByteBufferMessageSet();

        public long highWatermark();
    }
}

