/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.net.SocketTimeoutException;
import java.util.LinkedHashMap;
import kafka.admin.AdminUtils$;
import kafka.api.KAFKA_0_10_0_IV0$;
import kafka.api.KAFKA_0_10_1_IV1$;
import kafka.api.KAFKA_0_10_1_IV2$;
import kafka.api.KAFKA_0_9_0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Replica;
import kafka.common.KafkaStorageException;
import kafka.common.TopicAndPartition;
import kafka.log.Log;
import kafka.log.LogConfig$;
import kafka.message.ByteBufferMessageSet;
import kafka.server.AbstractFetcherThread;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.PartitionFetchState;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicationQuotaManager;
import kafka.utils.NetworkClientBlockingOps$;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\tEg\u0001B\u0001\u0003\u0001\u001d\u0011ACU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$'BA\u0002\u0005\u0003\u0019\u0019XM\u001d<fe*\tQ!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001A\u0001CA\u0005\u000b\u001b\u0005\u0011\u0011BA\u0006\u0003\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012D\u0011\"\u0004\u0001\u0003\u0002\u0003\u0006IAD\u000e\u0002\t9\fW.\u001a\t\u0003\u001faq!\u0001\u0005\f\u0011\u0005E!R\"\u0001\n\u000b\u0005M1\u0011A\u0002\u001fs_>$hHC\u0001\u0016\u0003\u0015\u00198-\u00197b\u0013\t9B#\u0001\u0004Qe\u0016$WMZ\u0005\u00033i\u0011aa\u0015;sS:<'BA\f\u0015\u0013\tiA$\u0003\u0002\u001e=\t\u00112\u000b[;uI><h.\u00192mKRC'/Z1e\u0015\tyB!A\u0003vi&d7\u000f\u0003\u0005\"\u0001\t\u0005\t\u0015!\u0003#\u0003%1W\r^2iKJLE\r\u0005\u0002$I5\tA#\u0003\u0002&)\t\u0019\u0011J\u001c;\t\u0011\u001d\u0002!\u0011!Q\u0001\n!\nAb]8ve\u000e,'I]8lKJ\u0004\"!\u000b\u0017\u000e\u0003)R!a\u000b\u0003\u0002\u000f\rdWo\u001d;fe&\u0011QF\u000b\u0002\u000f\u0005J|7.\u001a:F]\u0012\u0004v.\u001b8u\u0011!y\u0003A!A!\u0002\u0013\u0001\u0014\u0001\u00042s_.,'oQ8oM&<\u0007CA\u00052\u0013\t\u0011$AA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\u0002\u0003\u001b\u0001\u0005\u0003\u0005\u000b\u0011B\u001b\u0002\u0015I,\u0007\u000f\\5dC6;'\u000f\u0005\u0002\nm%\u0011qG\u0001\u0002\u000f%\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s\u0011!I\u0004A!A!\u0002\u0013Q\u0014aB7fiJL7m\u001d\t\u0003w\u0011k\u0011\u0001\u0010\u0006\u0003suR!AP \u0002\r\r|W.\\8o\u0015\t)\u0001I\u0003\u0002B\u0005\u00061\u0011\r]1dQ\u0016T\u0011aQ\u0001\u0004_J<\u0017BA#=\u0005\u001diU\r\u001e:jGND\u0001b\u0012\u0001\u0003\u0002\u0003\u0006I\u0001S\u0001\u0005i&lW\r\u0005\u0002J\u00176\t!J\u0003\u0002 {%\u0011AJ\u0013\u0002\u0005)&lW\r\u0003\u0005O\u0001\t\u0005\t\u0015!\u0003P\u0003\u0015\tXo\u001c;b!\tI\u0001+\u0003\u0002R\u0005\t9\"+\u001a9mS\u000e\fG/[8o#V|G/Y'b]\u0006<WM\u001d\u0005\u0006'\u0002!\t\u0001V\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0013U3v\u000bW-[7rk\u0006CA\u0005\u0001\u0011\u0015i!\u000b1\u0001\u000f\u0011\u0015\t#\u000b1\u0001#\u0011\u00159#\u000b1\u0001)\u0011\u0015y#\u000b1\u00011\u0011\u0015!$\u000b1\u00016\u0011\u0015I$\u000b1\u0001;\u0011\u00159%\u000b1\u0001I\u0011\u0015q%\u000b1\u0001P\u000b\u0011y\u0006\u0001\u00011\u0003\u0007I+\u0015\u000b\u0005\u0002bY:\u0011\u0011BY\u0004\u0006G\nA\t\u0001Z\u0001\u0015%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193\u0011\u0005%)g!B\u0001\u0003\u0011\u000317CA3h!\t\u0019\u0003.\u0003\u0002j)\t1\u0011I\\=SK\u001aDQaU3\u0005\u0002-$\u0012\u0001\u001a\u0004\u0006[\u0016\u0004!A\u001c\u0002\r\r\u0016$8\r\u001b*fcV,7\u000f^\n\u0004Y\u001e|\u0007C\u00019t\u001d\tI\u0011/\u0003\u0002s\u0005\u0005)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0017BA7u\u0015\t\u0011(\u0001\u0003\u0005wY\n\u0015\r\u0011\"\u0001x\u0003))h\u000eZ3sYfLgnZ\u000b\u0002qB\u0011\u0011\u0010`\u0007\u0002u*\u001110P\u0001\te\u0016\fX/Z:ug&\u0011QN\u001f\u0005\t}2\u0014\t\u0011)A\u0005q\u0006YQO\u001c3fe2L\u0018N\\4!\u0011\u0019\u0019F\u000e\"\u0001\u0002\u0002Q!\u00111AA\u0004!\r\t)\u0001\\\u0007\u0002K\")ao a\u0001q\"9\u00111\u00027\u0005\u0002\u00055\u0011aB5t\u000b6\u0004H/_\u000b\u0003\u0003\u001f\u00012aIA\t\u0013\r\t\u0019\u0002\u0006\u0002\b\u0005>|G.Z1o\u0011\u001d\t9\u0002\u001cC\u0001\u00033\taa\u001c4gg\u0016$H\u0003BA\u000e\u0003C\u00012aIA\u000f\u0013\r\ty\u0002\u0006\u0002\u0005\u0019>tw\r\u0003\u0005\u0002$\u0005U\u0001\u0019AA\u0013\u00039!x\u000e]5d!\u0006\u0014H/\u001b;j_:\u0004B!a\n\u0002*5\tQ(C\u0002\u0002,u\u0012a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|gNB\u0004\u00020\u0015\u0004!!!\r\u0003\u001bA\u000b'\u000f^5uS>tG)\u0019;b'\u0015\ticZA\u001a!\r\u0001\u0018QG\u0005\u0004\u0003_!\bB\u0003<\u0002.\t\u0015\r\u0011\"\u0001\u0002:U\u0011\u00111\b\t\u0005\u0003{\t\u0019ED\u0002z\u0003\u007fI1!!\u0011{\u000351U\r^2i%\u0016\u001c\bo\u001c8tK&!\u0011qFA#\u0015\r\t\tE\u001f\u0005\u000b}\u00065\"\u0011!Q\u0001\n\u0005m\u0002bB*\u0002.\u0011\u0005\u00111\n\u000b\u0005\u0003\u001b\ny\u0005\u0005\u0003\u0002\u0006\u00055\u0002b\u0002<\u0002J\u0001\u0007\u00111\b\u0005\t\u0003'\ni\u0003\"\u0001\u0002V\u0005IQM\u001d:pe\u000e{G-Z\u000b\u0003\u0003/\u00022aIA-\u0013\r\tY\u0006\u0006\u0002\u0006'\"|'\u000f\u001e\u0005\t\u0003?\ni\u0003\"\u0001\u0002b\u00051Bo\u001c\"zi\u0016\u0014UO\u001a4fe6+7o]1hKN+G/\u0006\u0002\u0002dA!\u0011QMA6\u001b\t\t9GC\u0002\u0002j\u0011\tq!\\3tg\u0006<W-\u0003\u0003\u0002n\u0005\u001d$\u0001\u0006\"zi\u0016\u0014UO\u001a4fe6+7o]1hKN+G\u000f\u0003\u0005\u0002r\u00055B\u0011AA:\u00035A\u0017n\u001a5XCR,'/\\1sWV\u0011\u00111\u0004\u0005\t\u0003o\ni\u0003\"\u0001\u0002z\u0005IQ\r_2faRLwN\\\u000b\u0003\u0003w\u0002RaIA?\u0003\u0003K1!a \u0015\u0005\u0019y\u0005\u000f^5p]B!\u00111QAG\u001d\u0011\t))!#\u000f\u0007E\t9)C\u0001\u0016\u0013\r\tY\tF\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\ty)!%\u0003\u0013QC'o\\<bE2,'bAAF)\u00151\u0011Q\u0013\u0001\u0001\u0003/\u0013!\u0001\u0015#\u0011\u0007\u0005\fi\u0003C\u0005\u0002\u001c\u0002\u0011\r\u0011\"\u0003\u0002V\u0005\u0019b-\u001a;dQJ+\u0017/^3tiZ+'o]5p]\"A\u0011q\u0014\u0001!\u0002\u0013\t9&\u0001\u000bgKR\u001c\u0007NU3rk\u0016\u001cHOV3sg&|g\u000e\t\u0005\n\u0003G\u0003!\u0019!C\u0005\u0003K\u000bQb]8dW\u0016$H+[7f_V$X#\u0001\u0012\t\u000f\u0005%\u0006\u0001)A\u0005E\u0005q1o\\2lKR$\u0016.\\3pkR\u0004\u0003\"CAW\u0001\t\u0007I\u0011BAS\u0003%\u0011X\r\u001d7jG\u0006LE\rC\u0004\u00022\u0002\u0001\u000b\u0011\u0002\u0012\u0002\u0015I,\u0007\u000f\\5dC&#\u0007\u0005C\u0005\u00026\u0002\u0011\r\u0011\"\u0003\u00028\u00069Q.\u0019=XC&$XCAA]!\u0011\tY,!2\u000e\u0005\u0005u&\u0002BA`\u0003\u0003\fA\u0001\\1oO*\u0011\u00111Y\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002H\u0006u&aB%oi\u0016<WM\u001d\u0005\t\u0003\u0017\u0004\u0001\u0015!\u0003\u0002:\u0006AQ.\u0019=XC&$\b\u0005C\u0005\u0002P\u0002\u0011\r\u0011\"\u0003\u00028\u0006AQ.\u001b8CsR,7\u000f\u0003\u0005\u0002T\u0002\u0001\u000b\u0011BA]\u0003%i\u0017N\u001c\"zi\u0016\u001c\b\u0005C\u0005\u0002X\u0002\u0011\r\u0011\"\u0003\u00028\u0006AQ.\u0019=CsR,7\u000f\u0003\u0005\u0002\\\u0002\u0001\u000b\u0011BA]\u0003%i\u0017\r\u001f\"zi\u0016\u001c\b\u0005C\u0005\u0002`\u0002\u0011\r\u0011\"\u0003\u00028\u0006Ia-\u001a;dQNK'0\u001a\u0005\t\u0003G\u0004\u0001\u0015!\u0003\u0002:\u0006Qa-\u001a;dQNK'0\u001a\u0011\t\u000f\u0005\u001d\b\u0001\"\u0003\u0002j\u0006A1\r\\5f]RLE-F\u0001\u000f\u0011%\ti\u000f\u0001b\u0001\n\u0013\ty/\u0001\u0006t_V\u00148-\u001a(pI\u0016,\"!!=\u0011\t\u0005\u001d\u00121_\u0005\u0004\u0003kl$\u0001\u0002(pI\u0016D\u0001\"!?\u0001A\u0003%\u0011\u0011_\u0001\fg>,(oY3O_\u0012,\u0007\u0005C\u0005\u0002~\u0002\u0011\r\u0011\"\u0003\u0002\u0000\u0006ia.\u001a;x_J\\7\t\\5f]R,\"A!\u0001\u0011\t\t\r!\u0011B\u0007\u0003\u0005\u000bQ1Aa\u0002@\u0003\u001d\u0019G.[3oiNLAAa\u0003\u0003\u0006\tia*\u001a;x_J\\7\t\\5f]RD\u0001Ba\u0004\u0001A\u0003%!\u0011A\u0001\u000f]\u0016$xo\u001c:l\u00072LWM\u001c;!\u0011\u001d\u0011\u0019\u0002\u0001C!\u0005+\t\u0001b\u001d5vi\u0012|wO\u001c\u000b\u0003\u0005/\u00012a\tB\r\u0013\r\u0011Y\u0002\u0006\u0002\u0005+:LG\u000fC\u0004\u0003 \u0001!\tA!\t\u0002)A\u0014xnY3tgB\u000b'\u000f^5uS>tG)\u0019;b)!\u00119Ba\t\u0003&\t%\u0002\u0002CA\u0012\u0005;\u0001\r!!\n\t\u0011\t\u001d\"Q\u0004a\u0001\u00037\t1BZ3uG\"|eMZ:fi\"A!1\u0006B\u000f\u0001\u0004\t9*A\u0007qCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u0005\b\u0005_\u0001A\u0011\u0001B\u0019\u0003mi\u0017-\u001f2f/\u0006\u0014h.\u00134NKN\u001c\u0018mZ3Pm\u0016\u00148/\u001b>fIR1!q\u0003B\u001a\u0005oA\u0001B!\u000e\u0003.\u0001\u0007\u00111M\u0001\u000b[\u0016\u001c8/Y4f'\u0016$\b\u0002CA\u0012\u0005[\u0001\r!!\n\t\u000f\tm\u0002\u0001\"\u0001\u0003>\u00051\u0002.\u00198eY\u0016|eMZ:fi>+Ho\u00144SC:<W\r\u0006\u0003\u0002\u001c\t}\u0002\u0002CA\u0012\u0005s\u0001\r!!\n\t\u000f\t\r\u0003\u0001\"\u0001\u0003F\u0005Q\u0002.\u00198eY\u0016\u0004\u0016M\u001d;ji&|gn],ji\",%O]8sgR!!q\u0003B$\u0011!\u0011IE!\u0011A\u0002\t-\u0013A\u00039beRLG/[8ogB1\u00111\u0011B'\u0003KIAAa\u0014\u0002\u0012\nA\u0011\n^3sC\ndW\rC\u0004\u0003T\u0001!\tB!\u0016\u0002\u000b\u0019,Go\u00195\u0015\t\t]#1\r\t\u0007\u0003\u0007\u0013IF!\u0018\n\t\tm\u0013\u0011\u0013\u0002\u0004'\u0016\f\bcB\u0012\u0003`\u0005\u0015\u0012qS\u0005\u0004\u0005C\"\"A\u0002+va2,'\u0007C\u0004\u0003f\tE\u0003\u0019\u00011\u0002\u0019\u0019,Go\u00195SKF,Xm\u001d;\t\u000f\t%\u0004\u0001\"\u0003\u0003l\u0005Y1/\u001a8e%\u0016\fX/Z:u)!\u0011iGa\u001d\u0003\u0004\n%\u0005\u0003\u0002B\u0002\u0005_JAA!\u001d\u0003\u0006\tq1\t\\5f]R\u0014Vm\u001d9p]N,\u0007\u0002\u0003B;\u0005O\u0002\rAa\u001e\u0002\r\u0005\u0004\u0018nS3z!\u0011\u0011IHa \u000e\u0005\tm$b\u0001B?{\u0005A\u0001O]8u_\u000e|G.\u0003\u0003\u0003\u0002\nm$aB!qS.+\u0017p\u001d\u0005\t\u0005\u000b\u00139\u00071\u0001\u0003\b\u0006Q\u0011\r]5WKJ\u001c\u0018n\u001c8\u0011\u000b\r\ni(a\u0016\t\u0011\t-%q\ra\u0001\u0005\u001b\u000bqA]3rk\u0016\u001cH\u000fE\u0002z\u0005\u001fK1A!%{\u0005=\t%m\u001d;sC\u000e$(+Z9vKN$\bb\u0002BK\u0001\u0011%!qS\u0001\u0017K\u0006\u0014H.[3ti>\u0013H*\u0019;fgR|eMZ:fiRA\u00111\u0004BM\u00057\u0013y\n\u0003\u0005\u0002$\tM\u0005\u0019AA\u0013\u0011!\u0011iJa%A\u0002\u0005m\u0011\u0001E3be2LWm\u001d;Pe2\u000bG/Z:u\u0011\u001d\u0011\tKa%A\u0002\t\n!bY8ogVlWM]%e\u0011\u001d\u0011)\u000b\u0001C\t\u0005O\u000b\u0011CY;jY\u00124U\r^2i%\u0016\fX/Z:u)\r\u0001'\u0011\u0016\u0005\t\u0005W\u0013\u0019\u000b1\u0001\u0003.\u0006a\u0001/\u0019:uSRLwN\\'baB1\u00111\u0011B-\u0005_\u0003ra\tB0\u0003K\u0011\t\fE\u0002\n\u0005gK1A!.\u0003\u0005M\u0001\u0016M\u001d;ji&|gNR3uG\"\u001cF/\u0019;f\u0011\u001d\u0011I\f\u0001C\u0005\u0005w\u000bac\u001d5pk2$gi\u001c7m_^,'\u000f\u00165s_R$H.\u001a\u000b\u0007\u0003\u001f\u0011iL!2\t\u000f9\u00139\f1\u0001\u0003@B\u0019\u0011B!1\n\u0007\t\r'A\u0001\u0007SKBd\u0017nY1Rk>$\u0018\r\u0003\u0005\u0002$\t]\u0006\u0019\u0001Bd!\u0011\u0011IM!4\u000e\u0005\t-'B\u0001 \u0005\u0013\u0011\u0011yMa3\u0003#Q{\u0007/[2B]\u0012\u0004\u0016M\u001d;ji&|g\u000e")
public class ReplicaFetcherThread
extends AbstractFetcherThread {
    private final BrokerEndPoint sourceBroker;
    private final KafkaConfig brokerConfig;
    private final ReplicaManager replicaMgr;
    private final Time time;
    private final ReplicationQuotaManager quota;
    private final short fetchRequestVersion;
    private final int socketTimeout;
    private final int replicaId;
    private final Integer maxWait;
    private final Integer minBytes;
    private final Integer maxBytes;
    private final Integer fetchSize;
    private final Node sourceNode;
    private final NetworkClient networkClient;

    private short fetchRequestVersion() {
        return this.fetchRequestVersion;
    }

    private int socketTimeout() {
        return this.socketTimeout;
    }

    private int replicaId() {
        return this.replicaId;
    }

    private Integer maxWait() {
        return this.maxWait;
    }

    private Integer minBytes() {
        return this.minBytes;
    }

    private Integer maxBytes() {
        return this.maxBytes;
    }

    private Integer fetchSize() {
        return this.fetchSize;
    }

    private String clientId() {
        return super.name();
    }

    private Node sourceNode() {
        return this.sourceNode;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.networkClient().close();
    }

    public void processPartitionData(TopicPartition topicPartition, long fetchOffset, PartitionData partitionData) {
        try {
            String topic = topicPartition.topic();
            int partitionId = topicPartition.partition();
            Replica replica = (Replica)this.replicaMgr.getReplica(topic, partitionId, this.replicaMgr.getReplica$default$3()).get();
            ByteBufferMessageSet messageSet = partitionData.toByteBufferMessageSet();
            this.maybeWarnIfMessageOversized(messageSet, topicPartition);
            if (fetchOffset != replica.logEndOffset().messageOffset()) {
                throw new RuntimeException(new StringOps(Predef$.MODULE$.augmentString("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition, BoxesRunTime.boxToLong((long)fetchOffset), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset())})));
            }
            if (this.logger().isTraceEnabled()) {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)replica.brokerId()), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), topicPartition, BoxesRunTime.boxToInteger((int)messageSet.sizeInBytes()), BoxesRunTime.boxToLong((long)partitionData.highWatermark())})));
            }
            ((Log)replica.log().get()).append(messageSet, false);
            if (this.logger().isTraceEnabled()) {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)replica.brokerId()), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger((int)messageSet.sizeInBytes()), topicPartition})));
            }
            long followerHighWatermark = RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(replica.logEndOffset().messageOffset()), partitionData.highWatermark());
            replica.highWatermark_$eq(new LogOffsetMetadata(followerHighWatermark, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3()));
            if (this.logger().isTraceEnabled()) {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Follower %d set replica high watermark for partition [%s,%d] to %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)replica.brokerId()), topic, BoxesRunTime.boxToInteger((int)partitionId), BoxesRunTime.boxToLong((long)followerHighWatermark)})));
            }
            if (this.quota.isThrottled(new TopicAndPartition(topic, partitionId))) {
                this.quota.record(messageSet.sizeInBytes());
            }
        }
        catch (KafkaStorageException e) {
            this.fatal((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Disk error while replicating data for ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition})), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            Runtime.getRuntime().halt(1);
        }
    }

    public void maybeWarnIfMessageOversized(ByteBufferMessageSet messageSet, TopicPartition topicPartition) {
        block0: {
            if (this.fetchRequestVersion() > 2 || messageSet.sizeInBytes() <= 0 || messageSet.validBytes() > 0) break block0;
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition ", ". "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition})) + "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + "equal or larger than your settings for max.message.bytes, both at a broker and topic level.");
        }
    }

    @Override
    public long handleOffsetOutOfRange(TopicPartition topicPartition) {
        long l;
        TopicAndPartition topicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
        Replica replica = (Replica)this.replicaMgr.getReplica(topicPartition.topic(), topicPartition.partition(), this.replicaMgr.getReplica$default$3()).get();
        long leaderEndOffset = this.earliestOrLatestOffset(topicPartition, -1L, this.brokerConfig.brokerId());
        if (leaderEndOffset < replica.logEndOffset().messageOffset()) {
            if (!Predef$.MODULE$.Boolean2boolean(LogConfig$.MODULE$.fromProps(this.brokerConfig.originals(), AdminUtils$.MODULE$.fetchEntityConfig(this.replicaMgr.zkUtils(), ConfigType$.MODULE$.Topic(), topicPartition.topic())).uncleanLeaderElectionEnable())) {
                this.fatal((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Exiting because log truncation is not allowed for partition %s,")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition})) + new StringOps(Predef$.MODULE$.augmentString(" Current leader %d's latest offset %d is less than replica %d's latest offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), BoxesRunTime.boxToLong((long)leaderEndOffset), BoxesRunTime.boxToInteger((int)$this.brokerConfig.brokerId()), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset())})));
                System.exit(1);
            }
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.brokerConfig.brokerId()), topicPartition, BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), BoxesRunTime.boxToLong((long)leaderEndOffset)})));
            this.replicaMgr.logManager().truncateTo((Map<TopicAndPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicAndPartition), (Object)BoxesRunTime.boxToLong((long)leaderEndOffset))}))));
            l = leaderEndOffset;
        } else {
            long leaderStartOffset = this.earliestOrLatestOffset(topicPartition, -2L, this.brokerConfig.brokerId());
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.brokerConfig.brokerId()), topicPartition, BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), BoxesRunTime.boxToLong((long)leaderStartOffset)})));
            long offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset().messageOffset());
            if (leaderStartOffset > replica.logEndOffset().messageOffset()) {
                this.replicaMgr.logManager().truncateFullyAndStartAt(topicAndPartition, leaderStartOffset);
            }
            l = offsetToFetch;
        }
        return l;
    }

    @Override
    public void handlePartitionsWithErrors(Iterable<TopicPartition> partitions) {
        this.delayPartitions(partitions, Predef$.MODULE$.Integer2int(this.brokerConfig.replicaFetchBackoffMs()));
    }

    public Seq<Tuple2<TopicPartition, PartitionData>> fetch(FetchRequest fetchRequest) {
        ClientResponse clientResponse = this.sendRequest(ApiKeys.FETCH, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)this.fetchRequestVersion())), (AbstractRequest)fetchRequest.underlying());
        return (Seq)((scala.collection.mutable.MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)new FetchResponse(clientResponse.responseBody()).responseData()).asScala()).toSeq().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition key = (TopicPartition)tuple2._1();
            FetchResponse.PartitionData value = (FetchResponse.PartitionData)tuple2._2();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)key), (Object)new PartitionData(value));
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom());
    }

    private ClientResponse sendRequest(ApiKeys apiKey, Option<Object> apiVersion, AbstractRequest request) {
        ClientResponse clientResponse;
        RequestHeader header = (RequestHeader)apiVersion.fold((Function0 & Serializable & scala.Serializable)() -> this.networkClient().nextRequestHeader(apiKey), (Function1 & Serializable & scala.Serializable)x$1 -> ReplicaFetcherThread.$anonfun$sendRequest$2(this, apiKey, BoxesRunTime.unboxToShort((Object)x$1)));
        try {
            if (!NetworkClientBlockingOps$.MODULE$.blockingReady$extension(NetworkClientBlockingOps$.MODULE$.networkClientBlockingOps(this.networkClient()), this.sourceNode(), this.socketTimeout(), this.time)) {
                throw new SocketTimeoutException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to connect within ", " ms"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.socketTimeout())})));
            }
            RequestSend send = new RequestSend(((Object)BoxesRunTime.boxToInteger((int)this.sourceBroker.id())).toString(), header, request.toStruct());
            ClientRequest clientRequest = new ClientRequest(this.time.milliseconds(), true, send, null);
            clientResponse = NetworkClientBlockingOps$.MODULE$.blockingSendAndReceive$extension(NetworkClientBlockingOps$.MODULE$.networkClientBlockingOps(this.networkClient()), clientRequest, this.time);
        }
        catch (Throwable e) {
            this.networkClient().close(((Object)BoxesRunTime.boxToInteger((int)this.sourceBroker.id())).toString());
            throw e;
        }
        return clientResponse;
    }

    private long earliestOrLatestOffset(TopicPartition topicPartition, long earliestOrLatest, int consumerId) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        if (this.brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_1_IV2$.MODULE$)) {
            Map partitions = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)earliestOrLatest)}));
            tuple22 = new Tuple2((Object)new ListOffsetRequest((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(partitions).asJava(), consumerId), (Object)BoxesRunTime.boxToInteger((int)1));
        } else {
            Map partitions = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)new ListOffsetRequest.PartitionData(earliestOrLatest, 1))}));
            tuple22 = tuple2 = new Tuple2((Object)new ListOffsetRequest(consumerId, (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(partitions).asJava()), (Object)BoxesRunTime.boxToInteger((int)0));
        }
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        ListOffsetRequest request = (ListOffsetRequest)tuple2._1();
        int apiVersion = tuple2._2$mcI$sp();
        Tuple2 tuple23 = new Tuple2((Object)request, (Object)BoxesRunTime.boxToInteger((int)apiVersion));
        Tuple2 tuple24 = tuple23;
        ListOffsetRequest request2 = (ListOffsetRequest)tuple24._1();
        int apiVersion2 = tuple24._2$mcI$sp();
        ClientResponse clientResponse = this.sendRequest(ApiKeys.LIST_OFFSETS, (Option<Object>)new Some((Object)BoxesRunTime.boxToShort((short)((short)apiVersion2))), (AbstractRequest)request2);
        ListOffsetResponse response = new ListOffsetResponse(clientResponse.responseBody());
        ListOffsetResponse.PartitionData partitionData = (ListOffsetResponse.PartitionData)response.responseData().get(topicPartition);
        Errors errors = Errors.forCode((short)partitionData.errorCode);
        if (!Errors.NONE.equals(errors)) {
            throw errors.exception();
        }
        long l = this.brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_1_IV2$.MODULE$) ? Predef$.MODULE$.Long2long(partitionData.offset) : Predef$.MODULE$.Long2long((Long)partitionData.offsets.get(0));
        return l;
    }

    @Override
    public FetchRequest buildFetchRequest(Seq<Tuple2<TopicPartition, PartitionFetchState>> partitionMap) {
        LinkedHashMap requestMap = new LinkedHashMap();
        partitionMap.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition topicPartition = (TopicPartition)tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState)tuple2._2();
            TopicAndPartition topicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
            BoxedUnit boxedUnit = partitionFetchState.isActive() && !this.shouldFollowerThrottle($this.quota, topicAndPartition) ? requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.offset(), Predef$.MODULE$.Integer2int(this.fetchSize()))) : BoxedUnit.UNIT;
            return boxedUnit;
        });
        org.apache.kafka.common.requests.FetchRequest request = this.fetchRequestVersion() >= 3 ? org.apache.kafka.common.requests.FetchRequest.fromReplica((int)this.replicaId(), (int)Predef$.MODULE$.Integer2int(this.maxWait()), (int)Predef$.MODULE$.Integer2int(this.minBytes()), (int)Predef$.MODULE$.Integer2int(this.maxBytes()), requestMap) : org.apache.kafka.common.requests.FetchRequest.fromReplica((int)this.replicaId(), (int)Predef$.MODULE$.Integer2int(this.maxWait()), (int)Predef$.MODULE$.Integer2int(this.minBytes()), requestMap);
        return new FetchRequest(request);
    }

    private boolean shouldFollowerThrottle(ReplicaQuota quota, TopicAndPartition topicPartition) {
        boolean isReplicaInSync = this.fetcherLagStats().isReplicaInSync(topicPartition.topic(), topicPartition.partition());
        return quota.isThrottled(topicPartition) && quota.isQuotaExceeded() && !isReplicaInSync;
    }

    public static final /* synthetic */ RequestHeader $anonfun$sendRequest$2(ReplicaFetcherThread $this, ApiKeys apiKey$1, short x$1) {
        return $this.networkClient().nextRequestHeader(apiKey$1, x$1);
    }

    public ReplicaFetcherThread(String name, int fetcherId, BrokerEndPoint sourceBroker, KafkaConfig brokerConfig, ReplicaManager replicaMgr, Metrics metrics, Time time, ReplicationQuotaManager quota) {
        this.sourceBroker = sourceBroker;
        this.brokerConfig = brokerConfig;
        this.replicaMgr = replicaMgr;
        this.time = time;
        this.quota = quota;
        super(name, name, sourceBroker, Predef$.MODULE$.Integer2int(brokerConfig.replicaFetchBackoffMs()), false);
        this.fetchRequestVersion = (short)(brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_1_IV1$.MODULE$) ? 3 : (brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_0_IV0$.MODULE$) ? 2 : (brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_9_0$.MODULE$) ? 1 : 0)));
        this.socketTimeout = Predef$.MODULE$.Integer2int(brokerConfig.replicaSocketTimeoutMs());
        this.replicaId = brokerConfig.brokerId();
        this.maxWait = brokerConfig.replicaFetchWaitMaxMs();
        this.minBytes = brokerConfig.replicaFetchMinBytes();
        this.maxBytes = brokerConfig.replicaFetchResponseMaxBytes();
        this.fetchSize = brokerConfig.replicaFetchMaxBytes();
        this.sourceNode = new Node(sourceBroker.id(), sourceBroker.host(), sourceBroker.port());
        ChannelBuilder channelBuilder = ChannelBuilders.create((SecurityProtocol)brokerConfig.interBrokerSecurityProtocol(), (Mode)Mode.CLIENT, (LoginType)LoginType.SERVER, (java.util.Map)brokerConfig.values(), (String)brokerConfig.saslMechanismInterBrokerProtocol(), (boolean)brokerConfig.saslInterBrokerHandshakeRequestEnable());
        Selector selector = new Selector(-1, Predef$.MODULE$.Long2long(brokerConfig.connectionsMaxIdleMs()), metrics, time, "replica-fetcher", (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)sourceBroker.id())).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"fetcher-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)fetcherId)).toString())}))).asJava(), false, channelBuilder);
        this.networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)new ManualMetadataUpdater(), this.clientId(), 1, 0L, -1, Predef$.MODULE$.Integer2int(brokerConfig.replicaSocketReceiveBufferBytes()), Predef$.MODULE$.Integer2int(brokerConfig.requestTimeoutMs()), time);
    }

    public static class FetchRequest
    implements AbstractFetcherThread.FetchRequest {
        private final org.apache.kafka.common.requests.FetchRequest underlying;

        public org.apache.kafka.common.requests.FetchRequest underlying() {
            return this.underlying;
        }

        @Override
        public boolean isEmpty() {
            return this.underlying().fetchData().isEmpty();
        }

        @Override
        public long offset(TopicPartition topicPartition) {
            return ((FetchRequest.PartitionData)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)this.underlying().fetchData()).asScala()).apply((Object)topicPartition)).offset;
        }

        public FetchRequest(org.apache.kafka.common.requests.FetchRequest underlying) {
            this.underlying = underlying;
        }
    }

    public static class PartitionData
    implements AbstractFetcherThread.PartitionData {
        private final FetchResponse.PartitionData underlying;

        public FetchResponse.PartitionData underlying() {
            return this.underlying;
        }

        @Override
        public short errorCode() {
            return this.underlying().errorCode;
        }

        @Override
        public ByteBufferMessageSet toByteBufferMessageSet() {
            return new ByteBufferMessageSet(this.underlying().recordSet);
        }

        @Override
        public long highWatermark() {
            return this.underlying().highWatermark;
        }

        @Override
        public Option<Throwable> exception() {
            Errors errors = Errors.forCode((short)this.errorCode());
            Object object = Errors.NONE.equals(errors) ? None$.MODULE$ : new Some((Object)errors.exception());
            return object;
        }

        public PartitionData(FetchResponse.PartitionData underlying) {
            this.underlying = underlying;
        }
    }
}

