/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\tUf\u0001B\u0001\u0003\u0001-\u0011q\u0003V3tiN#(/^2ukJ,Gm\u0015;sK\u0006l\u0017N\\4\u000b\u0005\r!\u0011A\u00034v]\u000e$\u0018n\u001c8bY*\u0011QAB\u0001\u0005QV$\u0017N\u0003\u0002\b\u0011\u00051\u0011\r]1dQ\u0016T\u0011!C\u0001\u0004_J<7\u0001A\n\u0003\u00011\u0001\"!\u0004\t\u000e\u00039Q!a\u0004\u0003\u0002\u0013Q,7\u000f^;uS2\u001c\u0018BA\t\u000f\u0005QAun\u001c3jK\u000ec\u0017.\u001a8u)\u0016\u001cHOQ1tK\")1\u0003\u0001C\u0001)\u00051A(\u001b8jiz\"\u0012!\u0006\t\u0003-\u0001i\u0011A\u0001\u0005\b1\u0001\u0011\r\u0011\"\u0003\u001a\u0003\rawnZ\u000b\u00025A\u00111DH\u0007\u00029)\u0011QDB\u0001\u0006Y><GG[\u0005\u0003?q\u0011a\u0001T8hO\u0016\u0014\bBB\u0011\u0001A\u0003%!$\u0001\u0003m_\u001e\u0004\u0003bB\u0012\u0001\u0001\u0004%\t\u0001J\u0001\u0006gB\f'o[\u000b\u0002KA\u0011aEK\u0007\u0002O)\u0011\u0001&K\u0001\u0004gFd'BA\u0012\u0007\u0013\tYsE\u0001\u0007Ta\u0006\u00148nU3tg&|g\u000eC\u0004.\u0001\u0001\u0007I\u0011\u0001\u0018\u0002\u0013M\u0004\u0018M]6`I\u0015\fHCA\u00186!\t\u00014'D\u00012\u0015\u0005\u0011\u0014!B:dC2\f\u0017B\u0001\u001b2\u0005\u0011)f.\u001b;\t\u000fYb\u0013\u0011!a\u0001K\u0005\u0019\u0001\u0010J\u0019\t\ra\u0002\u0001\u0015)\u0003&\u0003\u0019\u0019\b/\u0019:lA!9!\b\u0001b\u0001\n\u0003Y\u0014AC2p[6|gn\u00149ugV\tA\b\u0005\u0003>\u0005\u0012#U\"\u0001 \u000b\u0005}\u0002\u0015!C5n[V$\u0018M\u00197f\u0015\t\t\u0015'\u0001\u0006d_2dWm\u0019;j_:L!a\u0011 \u0003\u00075\u000b\u0007\u000f\u0005\u0002F\u00156\taI\u0003\u0002H\u0011\u0006!A.\u00198h\u0015\u0005I\u0015\u0001\u00026bm\u0006L!a\u0013$\u0003\rM#(/\u001b8h\u0011\u0019i\u0005\u0001)A\u0005y\u0005Y1m\\7n_:|\u0005\u000f^:!\u0011\u0015y\u0005\u0001\"\u0011Q\u0003\u0015\u0019X\r^+q)\u0005y\u0003F\u0001(S!\t\u0019&,D\u0001U\u0015\t)f+A\u0002ba&T!a\u0016-\u0002\u000f),\b/\u001b;fe*\u0011\u0011\fC\u0001\u0006UVt\u0017\u000e^\u0005\u00037R\u0013!BQ3g_J,W)Y2i\u0011\u0015i\u0006\u0001\"\u0011Q\u0003!!X-\u0019:E_^t\u0007F\u0001/`!\t\u0019\u0006-\u0003\u0002b)\nI\u0011I\u001a;fe\u0016\u000b7\r\u001b\u0005\u0006G\u0002!\t\u0001Z\u0001\u0019S:LGo\u0015;sK\u0006l\u0017N\\4Xe&$XMR;ukJ,G#B3lgnl\bc\u00014j_5\tqM\u0003\u0002ic\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\u0005)<'A\u0002$viV\u0014X\rC\u0003mE\u0002\u0007Q.\u0001\u0004tG\",W.\u0019\t\u0003]Fl\u0011a\u001c\u0006\u0003a\u001e\nQ\u0001^=qKNL!A]8\u0003\u0015M#(/^2u)f\u0004X\rC\u0003uE\u0002\u0007Q/\u0001\u0006t_V\u00148-\u001a)bi\"\u0004\"A^=\u000f\u0005A:\u0018B\u0001=2\u0003\u0019\u0001&/\u001a3fM&\u00111J\u001f\u0006\u0003qFBQ\u0001 2A\u0002U\f\u0001\u0002Z3tiB\u000bG\u000f\u001b\u0005\u0006}\n\u0004\ra`\u0001\fQV$\u0017n\u00149uS>t7\u000fE\u0003w\u0003\u0003)X/\u0003\u0002Du\"9\u0011Q\u0001\u0001\u0005\u0002\u0005\u001d\u0011AH5oSR\u001cFO]3b[&twmU8ve\u000e,\u0017I\u001c3EKN$\b+\u0019;i)\u0019\tI!a\u0004\u0002\u0014A)\u0001'a\u0003vk&\u0019\u0011QB\u0019\u0003\rQ+\b\u000f\\33\u0011\u001d\t\t\"a\u0001A\u0002U\fQb]8ve\u000e,G)\u001b:OC6,\u0007bBA\u000b\u0003\u0007\u0001\r!^\u0001\fI\u0016\u001cH\u000fR5s\u001d\u0006lW\rC\u0004\u0002\u001a\u0001!\t!a\u0007\u0002)\u001d,Go\u00149ug^KG\u000f\u001b+bE2,G+\u001f9f)\ry\u0018Q\u0004\u0005\t\u0003?\t9\u00021\u0001\u0002\"\u0005IA/\u00192mKRK\b/\u001a\t\u0005\u0003G\ti#\u0004\u0002\u0002&)!\u0011qEA\u0015\u0003\u0015iw\u000eZ3m\u0015\r\tY\u0003B\u0001\u0007G>lWn\u001c8\n\t\u0005=\u0012Q\u0005\u0002\u0010\u0011>|G-[3UC\ndW\rV=qK\"9\u00111\u0007\u0001\u0005\u0002\u0005U\u0012!E4fi\u000ecWo\u001d;fe&twm\u00149ugRYq0a\u000e\u0002:\u0005u\u0012\u0011IA#\u0011!\ty\"!\rA\u0002\u0005\u0005\u0002bBA\u001e\u0003c\u0001\r!^\u0001\u0013SNLe\u000e\\5oK\u000ecWo\u001d;fe&tw\rC\u0004\u0002@\u0005E\u0002\u0019A;\u0002#%\u001c\u0018i]=oG\u000ecWo\u001d;fe&tw\rC\u0004\u0002D\u0005E\u0002\u0019A;\u0002'\rdWo\u001d;fe&twMT;n\u0007>lW.\u001b;\t\u0011\u0005\u001d\u0013\u0011\u0007a\u0001\u0003\u0013\n\u0001CZ5mK6\u000b\u0007PU3d_J$g*^7\u0011\u0007A\nY%C\u0002\u0002NE\u00121!\u00138u\u0011\u001d\t\t\u0006\u0001C\u0001\u0003'\n\u0011cZ3u\u0007>l\u0007/Y2uS>tw\n\u001d;t)\u0015y\u0018QKA,\u0011!\ty\"a\u0014A\u0002\u0005\u0005\u0002\u0002CA-\u0003\u001f\u0002\r!a\u0017\u0002#%\u001c\u0018i]=oG\u000e{W\u000e]1di&|g\u000eE\u00021\u0003;J1!a\u00182\u0005\u001d\u0011un\u001c7fC:Dq!a\u0019\u0001\t\u0003\t)'A\u000ftiJ,8\r^;sK\u0012\u001cFO]3b[&tw\rV3tiJ+hN\\3s)\u001dy\u0013qMA5\u0003[B\u0001\"a\b\u0002b\u0001\u0007\u0011\u0011\u0005\u0005\t\u0003W\n\t\u00071\u0001\u0002\\\u0005!\u0012\r\u001a3D_6\u0004\u0018m\u0019;j_:\u001cuN\u001c4jOND\u0001\"!\u0017\u0002b\u0001\u0007\u00111\f\u0005\b\u0003c\u0002A\u0011AA:\u0003]!Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&tw\rF\u00020\u0003kB\u0001\"a\b\u0002p\u0001\u0007\u0011\u0011\u0005\u0015\t\u0003_\nI(!#\u0002\fB!\u00111PAC\u001b\t\tiH\u0003\u0003\u0002\u0000\u0005\u0005\u0015\u0001\u00039s_ZLG-\u001a:\u000b\u0007\u0005\re+\u0001\u0004qCJ\fWn]\u0005\u0005\u0003\u000f\u000biH\u0001\u0006F]Vl7k\\;sG\u0016\fQA^1mk\u0016\u001c#!!\t)\t\u0005=\u0014q\u0012\t\u0005\u0003#\u000b\u0019*\u0004\u0002\u0002\u0002&!\u0011QSAA\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f\u001e\u0005\b\u00033\u0003A\u0011BAN\u0003]9\u0018-\u001b;US2d\u0017\t\u001e7fCN$hjQ8n[&$8\u000f\u0006\u0007\u0002J\u0005u\u0015qVAZ\u0003o\u000bY\f\u0003\u0005\u0002 \u0006]\u0005\u0019AAQ\u0003\t17\u000f\u0005\u0003\u0002$\u0006-VBAAS\u0015\u0011\ty*a*\u000b\u0007\u0005%f!\u0001\u0004iC\u0012|w\u000e]\u0005\u0005\u0003[\u000b)K\u0001\u0006GS2,7+_:uK6Dq!!-\u0002\u0018\u0002\u0007Q/A\u0005uC\ndW\rU1uQ\"A\u0011QWAL\u0001\u0004\tI%\u0001\u0006ok6\u001cu.\\7jiND\u0001\"!/\u0002\u0018\u0002\u0007\u0011\u0011J\u0001\fi&lWm\\;u'\u0016\u001c7\u000f\u0003\u0005\u0002>\u0006]\u0005\u0019AA%\u0003U\u0019H.Z3q'\u0016\u001c7/\u00114uKJ,\u0015m\u00195Sk:Dc!a&\u0002B\u0006}\u0007#\u0002\u0019\u0002D\u0006\u001d\u0017bAAcc\t1A\u000f\u001b:poN\u0004B!!3\u0002Z:!\u00111ZAk\u001d\u0011\ti-a5\u000e\u0005\u0005='bAAi\u0015\u00051AH]8pizJ\u0011AM\u0005\u0004\u0003/\f\u0014a\u00029bG.\fw-Z\u0005\u0005\u00037\fiN\u0001\u000bJ]R,'O];qi\u0016$W\t_2faRLwN\u001c\u0006\u0004\u0003/\f\u0014G\u0002\u0010v\u0003C\u0014i!M\u0005$\u0003G\fYOa\u0001\u0002nV!\u0011Q]At+\u0005)HaBAu\u0015\t\u0007\u00111\u001f\u0002\u0002)&!\u0011Q^Ax\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%c)\u0019\u0011\u0011_\u0019\u0002\rQD'o\\<t#\u0011\t)0a?\u0011\u0007A\n90C\u0002\u0002zF\u0012qAT8uQ&tw\r\u0005\u0003\u0002~\u0006}hb\u0001\u0019\u0002V&!!\u0011AAo\u0005%!\u0006N]8xC\ndW-M\u0005$\u0005\u000b\u00119A!\u0003\u0002r:\u0019\u0001Ga\u0002\n\u0007\u0005E\u0018'M\u0003#aE\u0012YAA\u0003tG\u0006d\u0017-M\u0002'\u0003\u000fDqA!\u0005\u0001\t\u0003\u0011\u0019\"A\u0013uKN$8\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO^KG\u000f[\"mkN$XM]5oOR\u0019qF!\u0006\t\u0011\u0005}\"q\u0002a\u0001\u00037B\u0003Ba\u0004\u0003\u001a\t}!\u0011\u0005\t\u0005\u0003w\u0012Y\"\u0003\u0003\u0003\u001e\u0005u$a\u0003,bYV,7k\\;sG\u0016\f\u0001BY8pY\u0016\fgn\u001d\u0017\u0005\u0005G\u0011)#G\u0001\u00023\u0005\u0001\u0001\u0006\u0002B\b\u0003\u001fCqAa\u000b\u0001\t\u0003\u0011i#A\u0013uKN$8\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO^KG\u000f[\"p[B\f7\r^5p]R\u0019qFa\f\t\u0011\u0005e#\u0011\u0006a\u0001\u00037B\u0003B!\u000b\u0003\u001a\t}!1\u0007\u0017\u0005\u0005G\u0011)\u0003\u000b\u0003\u0003*\u0005=\u0005b\u0002B\u001d\u0001\u0011\u0005!1H\u0001+gR\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6Lgn\u001a$peR+7\u000f^\"mkN$XM]5oOJ+hN\\3s)=y#Q\bB \u0005\u0003\u0012\u0019E!\u0012\u0003H\t-\u0003B\u0002;\u00038\u0001\u0007Q\u000f\u0003\u0004}\u0005o\u0001\r!\u001e\u0005\t\u0003?\u00119\u00041\u0001\u0002\"!A\u00111\bB\u001c\u0001\u0004\tY\u0006\u0003\u0005\u0002@\t]\u0002\u0019AA.\u0011\u001d\u0011IEa\u000eA\u0002U\f!\u0003]1si&$\u0018n\u001c8PMJ+7m\u001c:eg\"A!Q\nB\u001c\u0001\u0004\u0011y%A\u000bdQ\u0016\u001c7n\u00117vgR,'/\u001b8h%\u0016\u001cX\u000f\u001c;\u0011\u000bA\u0012\t&^\u0018\n\u0007\tM\u0013GA\u0005Gk:\u001cG/[8oc!9!q\u000b\u0001\u0005\n\te\u0013!G4fi2\u000bG/Z:u\r&dWm\u0012:pkB\u001ch)\u001b7f\u0013\u0012$BAa\u0017\u0003bA!\u0001G!\u0018v\u0013\r\u0011y&\r\u0002\u0006\u0003J\u0014\u0018-\u001f\u0005\b\u0005G\u0012)\u00061\u0001v\u0003%\u0001\u0018M\u001d;ji&|g\u000eC\u0004\u0003h\u0001!IA!\u001b\u0002E]\f\u0017\u000e\u001e+jY2D\u0015m]\"p[BdW\r^3e%\u0016\u0004H.Y2f\u0013:\u001cH/\u00198u)\u001dy#1\u000eB7\u0005_Bq!!-\u0003f\u0001\u0007Q\u000f\u0003\u0005\u0002:\n\u0015\u0004\u0019AA%\u0011!\tiL!\u001aA\u0002\u0005%\u0003F\u0002B3\u0003\u0003\u0014\u0019(\r\u0004\u001fk\nU$1P\u0019\nG\u0005\r\u00181\u001eB<\u0003[\f\u0014b\tB\u0003\u0005\u000f\u0011I(!=2\u000b\t\u0002\u0014Ga\u00032\u0007\u0019\n9\rC\u0004\u0003\u0000\u0001!IA!!\u0002\u001b1\fG/Z:u\u0013:\u001cH/\u00198u)\u001d)(1\u0011BC\u0005\u0013C\u0001\"a(\u0003~\u0001\u0007\u0011\u0011\u0015\u0005\b\u0005\u000f\u0013i\b1\u0001v\u0003!\u0011\u0017m]3QCRD\u0007b\u0002BF\u0005{\u0002\r!^\u0001\u000eS:\u001cH/\u00198u\u0003\u000e$\u0018n\u001c8\t\u0019\t=\u0005\u0001%A\u0001\u0002\u0003%\tA!%\u0002%A\u0014x\u000e^3di\u0016$GEY1tKB\u000bG\u000f\u001b\u000b\u0004\t\nM\u0005\u0002\u0003\u001c\u0003\u000e\u0006\u0005\t\u0019A\u000b\t\u0019\t]\u0005\u0001%A\u0001\u0002\u0003%\tA!'\u0002\u0019A\u0014x\u000e^3di\u0016$GEZ:\u0015\t\u0005\u0005&1\u0014\u0005\tm\tU\u0015\u0011!a\u0001+!a!q\u0014\u0001\u0011\u0002\u0003\u0005\t\u0011\"\u0001\u0003\"\u00069\u0002O]8uK\u000e$X\r\u001a\u0013tKRlW\r^1DY&,g\u000e\u001e\u000b\u0006_\t\r&Q\u0015\u0005\tm\tu\u0015\u0011!a\u0001+!Q!q\u0015BO\u0003\u0003\u0005\rA!+\u0002\u0007a$#\u0007\u0005\u0003\u0003,\nEVB\u0001BW\u0015\u0011\u0011y+!\u000b\u0002\u000bQ\f'\r\\3\n\t\tM&Q\u0016\u0002\u0016\u0011>|G-[3UC\ndW-T3uC\u000ec\u0017.\u001a8u\u0001")
public class TestStructuredStreaming
extends HoodieClientTestBase {
    private final Logger log = LogManager.getLogger(((Object)((Object)this)).getClass());
    private SparkSession spark = null;
    private final scala.collection.immutable.Map<String, String> commonOpts = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.insert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.upsert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), (Object)"_row_key"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)"partition"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), (Object)"timestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieWriteConfig.TBL_NAME.key()), (Object)"hoodie_test")}));

    public /* synthetic */ String protected$basePath(TestStructuredStreaming x$1) {
        return x$1.basePath;
    }

    public /* synthetic */ FileSystem protected$fs(TestStructuredStreaming x$1) {
        return x$1.fs;
    }

    public /* synthetic */ void protected$setmetaClient(TestStructuredStreaming x$1, HoodieTableMetaClient x$2) {
        ((HoodieClientTestHarness)x$1).metaClient = x$2;
    }

    private Logger log() {
        return this.log;
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession x$1) {
        this.spark = x$1;
    }

    public scala.collection.immutable.Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @BeforeEach
    public void setUp() {
        this.initPath();
        this.initSparkContexts();
        this.spark_$eq(this.sqlContext.sparkSession());
        this.initTestDataGenerator();
        this.initFileSystem();
        this.initTimelineService();
    }

    @AfterEach
    public void tearDown() {
        this.cleanupTimelineService();
        this.cleanupSparkContexts();
        this.cleanupTestDataGenerator();
        this.cleanupFileSystem();
    }

    public Future<BoxedUnit> initStreamingWriteFuture(StructType schema, String sourcePath, String destPath, scala.collection.immutable.Map<String, String> hudiOptions) {
        Dataset streamingInput = this.spark().readStream().schema(schema).json(sourcePath);
        return Future$.MODULE$.apply((Function0)new Serializable(this, destPath, hudiOptions, streamingInput){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ TestStructuredStreaming $outer;
            private final String destPath$1;
            private final scala.collection.immutable.Map hudiOptions$1;
            private final Dataset streamingInput$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Predef$.MODULE$.println((Object)"streaming starting");
                this.streamingInput$1.writeStream().format("org.apache.hudi").options((Map)this.hudiOptions$1).trigger(Trigger.ProcessingTime((long)100L)).option("checkpointLocation", new StringBuilder().append((Object)this.$outer.protected$basePath(this.$outer)).append((Object)"/checkpoint").toString()).outputMode(OutputMode.Append()).start(this.destPath$1).awaitTermination(10000L);
                Predef$.MODULE$.println((Object)"streaming ends");
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.destPath$1 = destPath$1;
                this.hudiOptions$1 = hudiOptions$1;
                this.streamingInput$1 = streamingInput$1;
            }
        }, (ExecutionContext)ExecutionContext.Implicits$.MODULE$.global());
    }

    public Tuple2<String, String> initStreamingSourceAndDestPath(String sourceDirName, String destDirName) {
        this.fs.delete(new Path(this.basePath), true);
        String sourcePath = new StringBuilder().append((Object)this.basePath).append((Object)"/").append((Object)sourceDirName).toString();
        String destPath = new StringBuilder().append((Object)this.basePath).append((Object)"/").append((Object)destDirName).toString();
        this.fs.mkdirs(new Path(sourcePath));
        return new Tuple2((Object)sourcePath, (Object)destPath);
    }

    public scala.collection.immutable.Map<String, String> getOptsWithTableType(HoodieTableType tableType) {
        return this.commonOpts().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType.name()));
    }

    public scala.collection.immutable.Map<String, String> getClusteringOpts(HoodieTableType tableType, String isInlineClustering, String isAsyncClustering, String clusteringNumCommit, int fileMaxRecordNum) {
        return this.getOptsWithTableType(tableType).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.INLINE_CLUSTERING.key()), (Object)isInlineClustering), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key()), (Object)clusteringNumCommit), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.ASYNC_CLUSTERING_ENABLE().key()), (Object)isAsyncClustering), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key()), (Object)clusteringNumCommit), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key()), (Object)((Object)BoxesRunTime.boxToInteger((int)this.dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum))).toString())}));
    }

    public scala.collection.immutable.Map<String, String> getCompactionOpts(HoodieTableType tableType, boolean isAsyncCompaction) {
        return this.getOptsWithTableType(tableType).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.ASYNC_COMPACT_ENABLE().key()), (Object)((Object)BoxesRunTime.boxToBoolean((boolean)isAsyncCompaction)).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()), (Object)"1"), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[0]));
    }

    public void structuredStreamingTestRunner(HoodieTableType tableType, boolean addCompactionConfigs, boolean isAsyncCompaction) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 != null) {
            Tuple2 tuple22;
            String sourcePath = (String)tuple2._1();
            String destPath = (String)tuple2._2();
            Tuple2 tuple23 = tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
            String sourcePath2 = (String)tuple23._1();
            String destPath2 = (String)tuple23._2();
            List records1 = JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).toList();
            Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
            List records2 = JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUpdates("001", Predef$.MODULE$.int2Integer(100)))).toList();
            Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
            long uniqueKeyCnt = inputDF2.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).distinct().count();
            scala.collection.immutable.Map<String, String> hudiOptions = addCompactionConfigs ? this.getCompactionOpts(tableType, isAsyncCompaction) : this.getOptsWithTableType(tableType);
            Future<BoxedUnit> f1 = this.initStreamingWriteFuture(inputDF1.schema(), sourcePath2, destPath2, hudiOptions);
            Future f2 = Future$.MODULE$.apply((Function0)new Serializable(this, tableType, addCompactionConfigs, sourcePath2, destPath2, inputDF1, inputDF2, uniqueKeyCnt){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ TestStructuredStreaming $outer;
                private final HoodieTableType tableType$1;
                private final boolean addCompactionConfigs$1;
                private final String sourcePath$1;
                private final String destPath$2;
                private final Dataset inputDF1$1;
                private final Dataset inputDF2$1;
                private final long uniqueKeyCnt$1;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    this.inputDF1$1.coalesce(1).write().mode(SaveMode.Append).json(this.sourcePath$1);
                    int currNumCommits = this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$waitTillAtleastNCommits(this.$outer.protected$fs(this.$outer), this.destPath$2, 1, 120, 5);
                    Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$2, (String)"000"));
                    String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$2);
                    Dataset hoodieROViewDF1 = this.$outer.spark().read().format("org.apache.hudi").load(new StringBuilder().append((Object)this.destPath$2).append((Object)"/*/*/*/*").toString());
                    Predef$.MODULE$.assert(hoodieROViewDF1.count() == 100L);
                    this.inputDF2$1.coalesce(1).write().mode(SaveMode.Append).json(this.sourcePath$1);
                    int numExpectedCommits = this.addCompactionConfigs$1 ? currNumCommits + 2 : currNumCommits + 1;
                    this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$waitTillAtleastNCommits(this.$outer.protected$fs(this.$outer), this.destPath$2, numExpectedCommits, 120, 5);
                    HoodieTableType hoodieTableType = this.tableType$1;
                    HoodieTableType hoodieTableType2 = HoodieTableType.MERGE_ON_READ;
                    String commitInstantTime2 = !(hoodieTableType != null ? !hoodieTableType.equals(hoodieTableType2) : hoodieTableType2 != null) ? this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$latestInstant(this.$outer.protected$fs(this.$outer), this.destPath$2, "deltacommit") : HoodieDataSourceHelpers.latestCommit((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$2);
                    Assertions.assertEquals((int)numExpectedCommits, (int)HoodieDataSourceHelpers.listCommitsSince((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$2, (String)"000").size());
                    Dataset hoodieROViewDF2 = this.$outer.spark().read().format("org.apache.hudi").load(new StringBuilder().append((Object)this.destPath$2).append((Object)"/*/*/*/*").toString());
                    Assertions.assertEquals((long)100L, (long)hoodieROViewDF2.count());
                    String firstCommit = (String)HoodieDataSourceHelpers.listCommitsSince((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$2, (String)"000").get(0);
                    Dataset hoodieIncViewDF1 = this.$outer.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), firstCommit).load(this.destPath$2);
                    Assertions.assertEquals((long)100L, (long)hoodieIncViewDF1.count());
                    Row[] countsPerCommit = (Row[])hoodieIncViewDF1.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
                    Assertions.assertEquals((int)1, (int)countsPerCommit.length);
                    Assertions.assertEquals((Object)firstCommit, (Object)countsPerCommit[0].get(0));
                    Dataset hoodieIncViewDF2 = this.$outer.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), commitInstantTime1).load(this.destPath$2);
                    Assertions.assertEquals((long)this.uniqueKeyCnt$1, (long)hoodieIncViewDF2.count());
                    countsPerCommit = (Row[])hoodieIncViewDF2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
                    Assertions.assertEquals((int)1, (int)countsPerCommit.length);
                    Assertions.assertEquals((Object)commitInstantTime2, (Object)countsPerCommit[0].get(0));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.tableType$1 = tableType$1;
                    this.addCompactionConfigs$1 = addCompactionConfigs$1;
                    this.sourcePath$1 = sourcePath$1;
                    this.destPath$2 = destPath$2;
                    this.inputDF1$1 = inputDF1$1;
                    this.inputDF2$1 = inputDF2$1;
                    this.uniqueKeyCnt$1 = uniqueKeyCnt$1;
                }
            }, (ExecutionContext)ExecutionContext.Implicits$.MODULE$.global());
            Await$.MODULE$.result((Awaitable)Future$.MODULE$.sequence((TraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Future[]{f1, f2})), Seq$.MODULE$.canBuildFrom(), (ExecutionContext)ExecutionContext.Implicits$.MODULE$.global()), (Duration)Duration$.MODULE$.Inf());
            return;
        }
        throw new MatchError(tuple2);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testStructuredStreaming(HoodieTableType tableType) {
        this.structuredStreamingTestRunner(tableType, false, false);
    }

    /*
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public int org$apache$hudi$functional$TestStructuredStreaming$$waitTillAtleastNCommits(FileSystem fs, String tablePath, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        int timeoutMsecs = timeoutSecs * 1000;
        int numInstants = 0;
        boolean success = false;
        while (!success && currTime - beginTime < (long)timeoutMsecs) {
            try {
                HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions((FileSystem)fs, (String)tablePath);
                this.log().info((Object)new StringBuilder().append((Object)"Timeline :").append((Object)timeline.getInstants().toArray()).toString());
                if (timeline.countInstants() < numCommits) {
                }
                numInstants = timeline.countInstants();
                success = true;
            }
            finally {
                if (success) continue;
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        if (success) {
            return numInstants;
        }
        throw new IllegalStateException(new StringBuilder().append((Object)"Timed-out waiting for ").append((Object)BoxesRunTime.boxToInteger((int)numCommits)).append((Object)" commits to appear in ").append((Object)tablePath).toString());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStructuredStreamingWithClustering(boolean isAsyncClustering) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 != null) {
            Tuple2 tuple22;
            String sourcePath = (String)tuple2._1();
            String destPath = (String)tuple2._2();
            Tuple2 tuple23 = tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
            String sourcePath2 = (String)tuple23._1();
            String destPath2 = (String)tuple23._2();
            this.structuredStreamingForTestClusteringRunner(sourcePath2, destPath2, HoodieTableType.COPY_ON_WRITE, !isAsyncClustering, isAsyncClustering, "2016/03/15", (Function1<String, BoxedUnit>)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ TestStructuredStreaming $outer;

                public final void apply(String destPath) {
                    this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$checkClusteringResult$1(destPath);
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            });
            return;
        }
        throw new MatchError(tuple2);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStructuredStreamingWithCompaction(boolean isAsyncCompaction) {
        this.structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction);
    }

    public void structuredStreamingForTestClusteringRunner(String sourcePath, String destPath, HoodieTableType tableType, boolean isInlineClustering, boolean isAsyncClustering, String partitionOfRecords, Function1<String, BoxedUnit> checkClusteringResult) {
        List records1 = JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), partitionOfRecords))).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        List records2 = JavaConversions$.MODULE$.asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("001", Predef$.MODULE$.int2Integer(100), partitionOfRecords))).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        scala.collection.immutable.Map<String, String> hudiOptions = this.getClusteringOpts(tableType, ((Object)BoxesRunTime.boxToBoolean((boolean)isInlineClustering)).toString(), ((Object)BoxesRunTime.boxToBoolean((boolean)isAsyncClustering)).toString(), "2", 100);
        Future<BoxedUnit> f1 = this.initStreamingWriteFuture(inputDF1.schema(), sourcePath, destPath, hudiOptions);
        Future f2 = Future$.MODULE$.apply((Function0)new Serializable(this, sourcePath, destPath, partitionOfRecords, checkClusteringResult, inputDF1, inputDF2){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ TestStructuredStreaming $outer;
            private final String sourcePath$2;
            private final String destPath$3;
            private final String partitionOfRecords$1;
            private final Function1 checkClusteringResult$2;
            private final Dataset inputDF1$2;
            private final Dataset inputDF2$2;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                this.inputDF1$2.coalesce(1).write().mode(SaveMode.Append).json(this.sourcePath$2);
                int currNumCommits = this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$waitTillAtleastNCommits(this.$outer.protected$fs(this.$outer), this.destPath$3, 1, 120, 5);
                Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$3, (String)"000"));
                this.inputDF2$2.coalesce(1).write().mode(SaveMode.Append).json(this.sourcePath$2);
                currNumCommits = this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$waitTillAtleastNCommits(this.$outer.protected$fs(this.$outer), this.destPath$3, currNumCommits + 1, 120, 5);
                this.$outer.protected$setmetaClient(this.$outer, HoodieTableMetaClient.builder().setConf(this.$outer.protected$fs(this.$outer).getConf()).setBasePath(this.destPath$3).setLoadActiveTimelineOnLoad(true).build());
                this.checkClusteringResult$2.apply((Object)this.destPath$3);
                Assertions.assertEquals((int)3, (int)HoodieDataSourceHelpers.listCommitsSince((FileSystem)this.$outer.protected$fs(this.$outer), (String)this.destPath$3, (String)"000").size());
                Assertions.assertTrue((Predef$.MODULE$.refArrayOps((Object[])this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$getLatestFileGroupsFileId(this.partitionOfRecords$1)).size() > 0 ? 1 : 0) != 0);
                Dataset hoodieROViewDF2 = this.$outer.spark().read().format("org.apache.hudi").load(new StringBuilder().append((Object)this.destPath$3).append((Object)"/*/*/*/*").toString());
                Assertions.assertEquals((long)200L, (long)hoodieROViewDF2.count());
                Row[] countsPerCommit = (Row[])hoodieROViewDF2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
                Assertions.assertEquals((int)2, (int)countsPerCommit.length);
                String commitInstantTime2 = this.$outer.org$apache$hudi$functional$TestStructuredStreaming$$latestInstant(this.$outer.protected$fs(this.$outer), this.destPath$3, "commit");
                Assertions.assertEquals((Object)commitInstantTime2, (Object)((Row)Predef$.MODULE$.refArrayOps((Object[])countsPerCommit).maxBy((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(Row row) {
                        return (String)row.getAs(0);
                    }
                }, (Ordering)Ordering.String$.MODULE$)).get(0));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.sourcePath$2 = sourcePath$2;
                this.destPath$3 = destPath$3;
                this.partitionOfRecords$1 = partitionOfRecords$1;
                this.checkClusteringResult$2 = checkClusteringResult$2;
                this.inputDF1$2 = inputDF1$2;
                this.inputDF2$2 = inputDF2$2;
            }
        }, (ExecutionContext)ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)Future$.MODULE$.sequence((TraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Future[]{f1, f2})), Seq$.MODULE$.canBuildFrom(), (ExecutionContext)ExecutionContext.Implicits$.MODULE$.global()), (Duration)Duration$.MODULE$.Inf());
    }

    public String[] org$apache$hudi$functional$TestStructuredStreaming$$getLatestFileGroupsFileId(String partition) {
        this.getHoodieTableFileSystemView(this.metaClient, (HoodieTimeline)this.metaClient.getActiveTimeline(), HoodieTestTable.of((HoodieTableMetaClient)this.metaClient).listAllBaseFiles());
        return (String[])Predef$.MODULE$.refArrayOps(this.tableView.getLatestFileSlices(partition).toArray()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(Object slice) {
                return ((FileSlice)slice).getFileGroupId().getFileId();
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void waitTillHasCompletedReplaceInstant(String tablePath, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        int timeoutMsecs = timeoutSecs * 1000;
        boolean success = false;
        if (!success && currTime - beginTime < (long)timeoutMsecs) {
            this.metaClient.reloadActiveTimeline();
            int completeReplaceSize = Predef$.MODULE$.refArrayOps(this.metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray()).size();
            Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"completeReplaceSize:").append((Object)BoxesRunTime.boxToInteger((int)completeReplaceSize)).toString());
            if (completeReplaceSize <= 0) {
            }
            success = true;
            {
                catch (TableNotFoundException tableNotFoundException) {
                    this.log().info((Object)"Got table not found exception. Retrying");
                }
            }
            finally {
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        if (success) {
            return;
        }
        throw new IllegalStateException(new StringBuilder().append((Object)"Timed-out waiting for completing replace instant appear in ").append((Object)tablePath).toString());
    }

    public String org$apache$hudi$functional$TestStructuredStreaming$$latestInstant(FileSystem fs, String basePath, String instantAction) {
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
        return ((HoodieInstant)metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{instantAction})).filterCompletedInstants().lastInstant().get()).getTimestamp();
    }

    public final void org$apache$hudi$functional$TestStructuredStreaming$$checkClusteringResult$1(String destPath) {
        this.waitTillHasCompletedReplaceInstant(destPath, 120, 1);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)1, (int)Predef$.MODULE$.refArrayOps((Object[])this.org$apache$hudi$functional$TestStructuredStreaming$$getLatestFileGroupsFileId("2016/03/15")).size());
    }
}

