/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import kafka.api.FetchResponsePartitionData;
import kafka.api.PartitionFetchInfo;
import kafka.cluster.Replica;
import kafka.common.TopicAndPartition;
import kafka.server.DelayedFetchMetrics$;
import kafka.server.DelayedOperation;
import kafka.server.FetchMetadata;
import kafka.server.FetchPartitionStatus;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;
import scala.runtime.NonLocalReturnControl;

@ScalaSignature(bytes="\u0006\u0001U3A!\u0001\u0002\u0001\u000f\taA)\u001a7bs\u0016$g)\u001a;dQ*\u00111\u0001B\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001\u0011A\u0011\u0011BC\u0007\u0002\u0005%\u00111B\u0001\u0002\u0011\t\u0016d\u0017-_3e\u001fB,'/\u0019;j_:D\u0011\"\u0004\u0001\u0003\u0002\u0003\u0006IA\u0004\u000b\u0002\u000f\u0011,G.Y=NgB\u0011qBE\u0007\u0002!)\t\u0011#A\u0003tG\u0006d\u0017-\u0003\u0002\u0014!\t!Aj\u001c8h\u0013\ti!\u0002\u0003\u0005\u0017\u0001\t\u0005\t\u0015!\u0003\u0018\u000351W\r^2i\u001b\u0016$\u0018\rZ1uCB\u0011\u0011\u0002G\u0005\u00033\t\u0011QBR3uG\"lU\r^1eCR\f\u0007\u0002C\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feB\u0011\u0011\"H\u0005\u0003=\t\u0011aBU3qY&\u001c\u0017-T1oC\u001e,'\u000f\u0003\u0005!\u0001\t\u0005\t\u0015!\u0003\"\u0003\u0015\tXo\u001c;b!\tI!%\u0003\u0002$\u0005\ta!+\u001a9mS\u000e\f\u0017+^8uC\"AQ\u0005\u0001B\u0001B\u0003%a%\u0001\tsKN\u0004xN\\:f\u0007\u0006dGNY1dWB!qbJ\u0015?\u0013\tA\u0003CA\u0005Gk:\u001cG/[8ocA\u0019!&L\u0018\u000e\u0003-R!\u0001\f\t\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002/W\t\u00191+Z9\u0011\t=\u0001$\u0007O\u0005\u0003cA\u0011a\u0001V;qY\u0016\u0014\u0004CA\u001a7\u001b\u0005!$BA\u001b\u0005\u0003\u0019\u0019w.\\7p]&\u0011q\u0007\u000e\u0002\u0012)>\u0004\u0018nY!oIB\u000b'\u000f^5uS>t\u0007CA\u001d=\u001b\u0005Q$BA\u001e\u0005\u0003\r\t\u0007/[\u0005\u0003{i\u0012!DR3uG\"\u0014Vm\u001d9p]N,\u0007+\u0019:uSRLwN\u001c#bi\u0006\u0004\"aD \n\u0005\u0001\u0003\"\u0001B+oSRDQA\u0011\u0001\u0005\u0002\r\u000ba\u0001P5oSRtDC\u0002#F\r\u001eC\u0015\n\u0005\u0002\n\u0001!)Q\"\u0011a\u0001\u001d!)a#\u0011a\u0001/!)1$\u0011a\u00019!)\u0001%\u0011a\u0001C!)Q%\u0011a\u0001M!)1\n\u0001C!\u0019\u0006YAO]=D_6\u0004H.\u001a;f)\u0005i\u0005CA\bO\u0013\ty\u0005CA\u0004C_>dW-\u00198\t\u000bE\u0003A\u0011\t*\u0002\u0019=tW\t\u001f9je\u0006$\u0018n\u001c8\u0015\u0003yBQ\u0001\u0016\u0001\u0005BI\u000b!b\u001c8D_6\u0004H.\u001a;f\u0001")
public class DelayedFetch
extends DelayedOperation {
    private final FetchMetadata fetchMetadata;
    private final ReplicaManager replicaManager;
    private final ReplicaQuota quota;
    private final Function1<Seq<Tuple2<TopicAndPartition, FetchResponsePartitionData>>, BoxedUnit> responseCallback;

    @Override
    public boolean tryComplete() {
        boolean bl;
        Object object = new Object();
        try {
            IntRef accumulatedSize = IntRef.create((int)0);
            IntRef accumulatedThrottledSize = IntRef.create((int)0);
            this.fetchMetadata.fetchPartitionStatus().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                DelayedFetch.$anonfun$tryComplete$1(this, accumulatedSize, accumulatedThrottledSize, object, x0$1);
                return BoxedUnit.UNIT;
            });
            bl = accumulatedSize.elem >= this.fetchMetadata.fetchMinBytes() || accumulatedSize.elem + accumulatedThrottledSize.elem >= this.fetchMetadata.fetchMinBytes() && !this.quota.isQuotaExceeded() ? this.forceComplete() : false;
        }
        catch (NonLocalReturnControl ex) {
            if (ex.key() == object) {
                bl = ex.value$mcZ$sp();
            }
            throw ex;
        }
        return bl;
    }

    @Override
    public void onExpiration() {
        if (this.fetchMetadata.isFromFollower()) {
            DelayedFetchMetrics$.MODULE$.followerExpiredRequestMeter().mark();
        } else {
            DelayedFetchMetrics$.MODULE$.consumerExpiredRequestMeter().mark();
        }
    }

    @Override
    public void onComplete() {
        Seq<Tuple2<TopicAndPartition, LogReadResult>> logReadResults = this.replicaManager.readFromLocalLog(this.fetchMetadata.replicaId(), this.fetchMetadata.fetchOnlyLeader(), this.fetchMetadata.fetchOnlyCommitted(), this.fetchMetadata.fetchMaxBytes(), this.fetchMetadata.hardMaxBytesLimit(), (Seq<Tuple2<TopicAndPartition, PartitionFetchInfo>>)((Seq)this.fetchMetadata.fetchPartitionStatus().map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicAndPartition tp = (TopicAndPartition)tuple2._1();
            FetchPartitionStatus status = (FetchPartitionStatus)tuple2._2();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)status.fetchInfo());
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom())), this.quota);
        Seq fetchPartitionData = (Seq)logReadResults.map((Function1 & Serializable & scala.Serializable)x0$3 -> {
            Tuple2 tuple2 = x0$3;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicAndPartition tp = (TopicAndPartition)tuple2._1();
            LogReadResult result = (LogReadResult)tuple2._2();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)new FetchResponsePartitionData(result.errorCode(), result.hw(), result.info().messageSet()));
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom());
        this.responseCallback.apply((Object)fetchPartitionData);
    }

    public static final /* synthetic */ void $anonfun$tryComplete$1(DelayedFetch $this, IntRef accumulatedSize$1, IntRef accumulatedThrottledSize$1, Object nonLocalReturnKey1$1, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            TopicAndPartition topicAndPartition = (TopicAndPartition)tuple2._1();
            FetchPartitionStatus fetchStatus = (FetchPartitionStatus)tuple2._2();
            LogOffsetMetadata fetchOffset = fetchStatus.startOffsetMetadata();
            try {
                LogOffsetMetadata logOffsetMetadata = fetchOffset;
                LogOffsetMetadata logOffsetMetadata2 = LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata();
                if (logOffsetMetadata == null ? logOffsetMetadata2 != null : !((Object)logOffsetMetadata).equals(logOffsetMetadata2)) {
                    LogOffsetMetadata endOffset;
                    Replica replica = $this.replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic(), topicAndPartition.partition());
                    LogOffsetMetadata logOffsetMetadata3 = endOffset = $this.fetchMetadata.fetchOnlyCommitted() ? replica.highWatermark() : replica.logEndOffset();
                    if (endOffset.messageOffset() != fetchOffset.messageOffset()) {
                        if (endOffset.onOlderSegment(fetchOffset)) {
                            $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Satisfying fetch %s since it is fetching later segments of partition %s.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.fetchMetadata, topicAndPartition})));
                            throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
                        }
                        if (fetchOffset.onOlderSegment(endOffset)) {
                            $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Satisfying fetch %s immediately since it is fetching older segments.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.fetchMetadata})));
                            if (!$this.replicaManager.shouldLeaderThrottle($this.quota, topicAndPartition, $this.fetchMetadata.replicaId())) {
                                throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
                            }
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        if (fetchOffset.messageOffset() < endOffset.messageOffset()) {
                            int bytesAvailable = package$.MODULE$.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo().fetchSize());
                            if ($this.quota.isThrottled(topicAndPartition)) {
                                accumulatedThrottledSize$1.elem += bytesAvailable;
                                boxedUnit = BoxedUnit.UNIT;
                            }
                            accumulatedSize$1.elem += bytesAvailable;
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    boxedUnit = BoxedUnit.UNIT;
                }
                boxedUnit = BoxedUnit.UNIT;
            }
            catch (UnknownTopicOrPartitionException utpe) {
                $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Broker no longer know of %s, satisfy %s immediately")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicAndPartition, $this.fetchMetadata})));
                throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
            catch (NotLeaderForPartitionException nle) {
                $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Broker is no longer the leader of %s, satisfy %s immediately")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicAndPartition, $this.fetchMetadata})));
                throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public DelayedFetch(long delayMs, FetchMetadata fetchMetadata, ReplicaManager replicaManager, ReplicaQuota quota, Function1<Seq<Tuple2<TopicAndPartition, FetchResponsePartitionData>>, BoxedUnit> responseCallback) {
        this.fetchMetadata = fetchMetadata;
        this.replicaManager = replicaManager;
        this.quota = quota;
        this.responseCallback = responseCallback;
        super(delayMs);
    }
}

