/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.hudi.streaming;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils$;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.IncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation;
import org.apache.hudi.SparkAdapterSupport;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset;
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset$;
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource$;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005g\u0001B\u000e\u001d\u0001%B\u0001B\u0013\u0001\u0003\u0002\u0003\u0006Ia\u0013\u0005\t\u001f\u0002\u0011\t\u0011)A\u0005!\"A1\f\u0001B\u0001B\u0003%A\f\u0003\u0005f\u0001\t\u0005\t\u0015!\u0003g\u0011\u0015I\u0007\u0001\"\u0001k\u0011\u001d\t\bA1A\u0005\nIDaa\u001f\u0001!\u0002\u0013\u0019\bBCA\u0001\u0001!\u0015\r\u0011\"\u0003\u0002\u0004!Q\u0011\u0011\u0003\u0001\t\u0006\u0004%I!a\u0005\t\u0015\u0005\u0015\u0002\u0001#b\u0001\n\u0013\t9\u0003C\u0006\u00026\u0001\u0001\r\u00111A\u0005\n\u0005]\u0002bCA \u0001\u0001\u0007\t\u0019!C\u0005\u0003\u0003B1\"!\u0014\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002:!Q\u0011\u0011\u000b\u0001\t\u0006\u0004%I!a\u000e\t\u000f\u0005U\u0003\u0001\"\u0003\u0002X!9\u00111\r\u0001\u0005B\u0005\u0015\u0004bBA4\u0001\u0011\u0005\u0013\u0011\u000e\u0005\b\u0003g\u0002A\u0011IA;\u0011\u001d\ti\n\u0001C\u0005\u0003?Cq!!*\u0001\t\u0003\n9kB\u0004\u0002*rA\t!a+\u0007\rma\u0002\u0012AAW\u0011\u0019Ig\u0003\"\u0001\u00026\"I\u0011q\u0017\fC\u0002\u0013\u0005\u0011\u0011\u0018\u0005\t\u0003w3\u0002\u0015!\u0003\u0002Z!I\u0011Q\u0018\f\u0002\u0002\u0013%\u0011q\u0018\u0002\u0013\u0011>|G-[3TiJ,\u0017-\\*pkJ\u001cWM\u0003\u0002\u001e=\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003?\u0001\nA\u0001[;eS*\u0011\u0011EI\u0001\u0004gFd'BA\u0012%\u0003\u0015\u0019\b/\u0019:l\u0015\t)c%\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002O\u0005\u0019qN]4\u0004\u0001M1\u0001A\u000b\u001a:\u007f\u0015\u0003\"a\u000b\u0019\u000e\u00031R!!\f\u0018\u0002\t1\fgn\u001a\u0006\u0002_\u0005!!.\u0019<b\u0013\t\tDF\u0001\u0004PE*,7\r\u001e\t\u0003g]j\u0011\u0001\u000e\u0006\u0003;UR!A\u000e\u0011\u0002\u0013\u0015DXmY;uS>t\u0017B\u0001\u001d5\u0005\u0019\u0019v.\u001e:dKB\u0011!(P\u0007\u0002w)\u0011AHI\u0001\tS:$XM\u001d8bY&\u0011ah\u000f\u0002\b\u0019><w-\u001b8h!\t\u00015)D\u0001B\u0015\u0005\u0011\u0015!B:dC2\f\u0017B\u0001#B\u00051\u0019VM]5bY&T\u0018M\u00197f!\t1\u0005*D\u0001H\u0015\tyB%\u0003\u0002J\u000f\n\u00192\u000b]1sW\u0006#\u0017\r\u001d;feN+\b\u000f]8si\u0006Q1/\u001d7D_:$X\r\u001f;\u0011\u00051kU\"\u0001\u0011\n\u00059\u0003#AC*R\u0019\u000e{g\u000e^3yi\u0006aQ.\u001a;bI\u0006$\u0018\rU1uQB\u0011\u0011\u000b\u0017\b\u0003%Z\u0003\"aU!\u000e\u0003QS!!\u0016\u0015\u0002\rq\u0012xn\u001c;?\u0013\t9\u0016)\u0001\u0004Qe\u0016$WMZ\u0005\u00033j\u0013aa\u0015;sS:<'BA,B\u00031\u00198\r[3nC>\u0003H/[8o!\r\u0001UlX\u0005\u0003=\u0006\u0013aa\u00149uS>t\u0007C\u00011d\u001b\u0005\t'B\u00012!\u0003\u0015!\u0018\u0010]3t\u0013\t!\u0017M\u0001\u0006TiJ,8\r\u001e+za\u0016\f!\u0002]1sC6,G/\u001a:t!\u0011\tv\r\u0015)\n\u0005!T&aA'ba\u00061A(\u001b8jiz\"Ra[7o_B\u0004\"\u0001\u001c\u0001\u000e\u0003qAQAS\u0003A\u0002-CQaT\u0003A\u0002ACQaW\u0003A\u0002qCQ!Z\u0003A\u0002\u0019\f!\u0002[1e_>\u00048i\u001c8g+\u0005\u0019\bC\u0001;z\u001b\u0005)(B\u0001<x\u0003\u0011\u0019wN\u001c4\u000b\u0005a$\u0013A\u00025bI>|\u0007/\u0003\u0002{k\ni1i\u001c8gS\u001e,(/\u0019;j_:\f1\u0002[1e_>\u00048i\u001c8gA!\u0012q! \t\u0003\u0001zL!a`!\u0003\u0013Q\u0014\u0018M\\:jK:$\u0018!\u0003;bE2,\u0007+\u0019;i+\t\t)\u0001\u0005\u0003\u0002\b\u00055QBAA\u0005\u0015\r\tYa^\u0001\u0003MNLA!a\u0004\u0002\n\t!\u0001+\u0019;i\u0003)iW\r^1DY&,g\u000e^\u000b\u0003\u0003+\u0001B!a\u0006\u0002\"5\u0011\u0011\u0011\u0004\u0006\u0005\u00037\ti\"A\u0003uC\ndWMC\u0002\u0002 \u001d\u000baaY8n[>t\u0017\u0002BA\u0012\u00033\u0011Q\u0003S8pI&,G+\u00192mK6+G/Y\"mS\u0016tG/A\u0005uC\ndW\rV=qKV\u0011\u0011\u0011\u0006\t\u0005\u0003W\t\t$\u0004\u0002\u0002.)!\u0011qFA\u000f\u0003\u0015iw\u000eZ3m\u0013\u0011\t\u0019$!\f\u0003\u001f!{w\u000eZ5f)\u0006\u0014G.\u001a+za\u0016\f!\u0002\\1ti>3gm]3u+\t\tI\u0004E\u0002m\u0003wI1!!\u0010\u001d\u0005IAun\u001c3jKN{WO]2f\u001f\u001a47/\u001a;\u0002\u001d1\f7\u000f^(gMN,Go\u0018\u0013fcR!\u00111IA%!\r\u0001\u0015QI\u0005\u0004\u0003\u000f\n%\u0001B+oSRD\u0011\"a\u0013\r\u0003\u0003\u0005\r!!\u000f\u0002\u0007a$\u0013'A\u0006mCN$xJ\u001a4tKR\u0004\u0003FA\u0007~\u00039Ig.\u001b;jC2|eMZ:fiND#AD?\u0002\u0015\u001d,GOV3sg&|g\u000e\u0006\u0003\u0002Z\u0005}\u0003c\u0001!\u0002\\%\u0019\u0011QL!\u0003\u0007%sG\u000f\u0003\u0004\u0002b=\u0001\r\u0001U\u0001\fm\u0016\u00148/[8o\u0019&tW-\u0001\u0004tG\",W.Y\u000b\u0002?\u0006Iq-\u001a;PM\u001a\u001cX\r^\u000b\u0003\u0003W\u0002B\u0001Q/\u0002nA\u00191'a\u001c\n\u0007\u0005EDG\u0001\u0004PM\u001a\u001cX\r^\u0001\tO\u0016$()\u0019;dQR1\u0011qOAK\u00033\u0003B!!\u001f\u0002\u0010:!\u00111PAF\u001d\u0011\ti(!#\u000f\t\u0005}\u0014q\u0011\b\u0005\u0003\u0003\u000b)ID\u0002T\u0003\u0007K\u0011aJ\u0005\u0003K\u0019J!a\t\u0013\n\u0005\u0005\u0012\u0013bAAGA\u00059\u0001/Y2lC\u001e,\u0017\u0002BAI\u0003'\u0013\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0007\u00055\u0005\u0005C\u0004\u0002\u0018J\u0001\r!a\u001b\u0002\u000bM$\u0018M\u001d;\t\u000f\u0005m%\u00031\u0001\u0002n\u0005\u0019QM\u001c3\u0002\u001fM$\u0018M\u001d;D_6l\u0017\u000e\u001e+j[\u0016$2\u0001UAQ\u0011\u001d\t\u0019k\u0005a\u0001\u0003s\t1b\u001d;beR|eMZ:fi\u0006!1\u000f^8q)\t\t\u0019%\u0001\nI_>$\u0017.Z*ue\u0016\fWnU8ve\u000e,\u0007C\u00017\u0017'\u00111\u0012qV \u0011\u0007\u0001\u000b\t,C\u0002\u00024\u0006\u0013a!\u00118z%\u00164GCAAV\u0003\u001d1VIU*J\u001f:+\"!!\u0017\u0002\u0011Y+%kU%P\u001d\u0002\n1B]3bIJ+7o\u001c7wKR\t!\u0006")
public class HoodieStreamSource
implements Source,
Logging,
scala.Serializable,
SparkAdapterSupport {
    private Path tablePath;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    public final SQLContext org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext;
    public final String org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final transient Configuration hadoopConf;
    private transient HoodieSourceOffset lastOffset;
    private SparkAdapter sparkAdapter;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile transient boolean bitmap$trans$0;
    private volatile byte bitmap$0;

    public static int VERSION() {
        return HoodieStreamSource$.MODULE$.VERSION();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    private SparkAdapter sparkAdapter$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.sparkAdapter = SparkAdapterSupport.sparkAdapter$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
        }
        return this.sparkAdapter;
    }

    @Override
    public SparkAdapter sparkAdapter() {
        return (byte)(this.bitmap$0 & 8) == 0 ? this.sparkAdapter$lzycompute() : this.sparkAdapter;
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Configuration hadoopConf() {
        return this.hadoopConf;
    }

    private Path tablePath$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                Path path = new Path((String)this.parameters.getOrElse((Object)"path", (Function0 & Serializable & scala.Serializable)() -> "Missing 'path' option"));
                FileSystem fs = path.getFileSystem(this.hadoopConf());
                this.tablePath = TablePathUtils.getTablePath(fs, path).get();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.tablePath;
    }

    private Path tablePath() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.tablePath$lzycompute() : this.tablePath;
    }

    private HoodieTableMetaClient metaClient$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.metaClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf()).setBasePath(this.tablePath().toString()).build();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.metaClient;
    }

    private HoodieTableMetaClient metaClient() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.metaClient$lzycompute() : this.metaClient;
    }

    private HoodieTableType tableType$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.tableType = this.metaClient().getTableType();
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.tableType$lzycompute() : this.tableType;
    }

    private HoodieSourceOffset lastOffset() {
        return this.lastOffset;
    }

    private void lastOffset_$eq(HoodieSourceOffset x$1) {
        this.lastOffset = x$1;
    }

    private HoodieSourceOffset initialOffsets$lzycompute() {
        HoodieStreamSource hoodieStreamSource = this;
        synchronized (hoodieStreamSource) {
            if (!this.bitmap$trans$0) {
                HDFSMetadataLog<HoodieSourceOffset> metadataLog = new HDFSMetadataLog<HoodieSourceOffset>(this){
                    private final /* synthetic */ HoodieStreamSource $outer;

                    public void serialize(HoodieSourceOffset metadata, OutputStream out) {
                        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
                        writer.write(new StringBuilder(2).append("v").append(HoodieStreamSource$.MODULE$.VERSION()).append("\n").toString());
                        writer.write(metadata.json());
                        writer.flush();
                    }

                    public HoodieSourceOffset deserialize(InputStream in) {
                        String content = FileIOUtils.readAsUTFString(in);
                        int firstLineEnd = content.indexOf("\n");
                        if (firstLineEnd > 0) {
                            int version = this.$outer.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion(content.substring(0, firstLineEnd));
                            if (version > HoodieStreamSource$.MODULE$.VERSION()) {
                                throw new IllegalStateException(new StringBuilder(63).append("UnSupportVersion: max support version is: ").append(HoodieStreamSource$.MODULE$.VERSION()).append(" current version is: ").append(version).toString());
                            }
                        } else {
                            throw new IllegalStateException("Bad metadata format, failed to find the version line.");
                        }
                        return HoodieSourceOffset$.MODULE$.fromJson(content.substring(firstLineEnd + 1));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        super($outer.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.sparkSession(), $outer.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$metadataPath, ClassTag$.MODULE$.apply(HoodieSourceOffset.class));
                    }
                };
                this.initialOffsets = (HoodieSourceOffset)((Object)metadataLog.get(0L).getOrElse(() -> HoodieStreamSource.$anonfun$initialOffsets$1((HDFSMetadataLog)metadataLog)));
                this.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    private HoodieSourceOffset initialOffsets() {
        return !this.bitmap$trans$0 ? this.initialOffsets$lzycompute() : this.initialOffsets;
    }

    public int org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$getVersion(String versionLine) {
        if (!versionLine.startsWith("v")) {
            throw new IllegalStateException(new StringBuilder(53).append("Illegal version line: ").append(versionLine).append(" ").append("in the streaming metadata path").toString());
        }
        return new StringOps(Predef$.MODULE$.augmentString(versionLine.substring(1))).toInt();
    }

    public StructType schema() {
        return (StructType)this.schemaOption.getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient());
            return AvroConversionUtils$.MODULE$.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema());
        });
    }

    public Option<Offset> getOffset() {
        this.metaClient().reloadActiveTimeline();
        HoodieTimeline activeInstants = this.metaClient().getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        if (!activeInstants.empty()) {
            String currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp();
            if (this.lastOffset() == null || new StringOps(Predef$.MODULE$.augmentString(currentLatestCommitTime)).$greater((Object)this.lastOffset().commitTime())) {
                this.lastOffset_$eq(new HoodieSourceOffset(currentLatestCommitTime));
            }
        } else {
            this.lastOffset_$eq(this.initialOffsets());
        }
        return new Some((Object)this.lastOffset());
    }

    public Dataset<Row> getBatch(Option<Offset> start, Offset end) {
        Dataset dataset;
        this.initialOffsets();
        HoodieSourceOffset startOffset = (HoodieSourceOffset)((Object)start.map((Function1 & Serializable & scala.Serializable)x$1 -> HoodieSourceOffset$.MODULE$.apply((Offset)x$1)).getOrElse((Function0 & Serializable & scala.Serializable)() -> this.initialOffsets()));
        HoodieSourceOffset endOffset = HoodieSourceOffset$.MODULE$.apply(end);
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = endOffset;
        if (!(hoodieSourceOffset != null ? !((Object)((Object)hoodieSourceOffset)).equals((Object)hoodieSourceOffset2) : hoodieSourceOffset2 != null)) {
            dataset = this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.internalCreateDataFrame(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), this.schema(), true);
        } else {
            RDD<Row> rDD;
            Map incParams = this.parameters.$plus$plus((GenTraversableOnce)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()), (Object)DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), (Object)this.startCommitTime(startOffset)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), (Object)endOffset.commitTime())})));
            HoodieTableType hoodieTableType = this.tableType();
            if (((Object)((Object)HoodieTableType.COPY_ON_WRITE)).equals((Object)hoodieTableType)) {
                SparkRowSerDe serDe = this.sparkAdapter().createSparkRowSerDe((ExpressionEncoder<Row>)RowEncoder$.MODULE$.apply(this.schema()));
                rDD = new IncrementalRelation(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext, (Map<String, String>)incParams, (Option<StructType>)new Some((Object)this.schema()), this.metaClient()).buildScan().map((Function1 & Serializable & scala.Serializable)x$1 -> serDe.serializeRow((Row)x$1), ClassTag$.MODULE$.apply(InternalRow.class));
            } else if (((Object)((Object)HoodieTableType.MERGE_ON_READ)).equals((Object)hoodieTableType)) {
                String[] requiredColumns = (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.schema().fields())).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.name(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
                rDD = new MergeOnReadIncrementalRelation(this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext, (Map<String, String>)incParams, (Option<StructType>)new Some((Object)this.schema()), this.metaClient()).buildScan(requiredColumns, (Filter[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
            } else {
                throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append((Object)this.tableType()).toString());
            }
            RDD<Row> rdd = rDD;
            dataset = this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext.internalCreateDataFrame((RDD)rdd, this.schema(), true);
        }
        return dataset;
    }

    private String startCommitTime(HoodieSourceOffset startOffset) {
        String string;
        HoodieSourceOffset hoodieSourceOffset = startOffset;
        HoodieSourceOffset hoodieSourceOffset2 = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        HoodieSourceOffset hoodieSourceOffset3 = hoodieSourceOffset;
        if (!(hoodieSourceOffset2 != null ? !((Object)((Object)hoodieSourceOffset2)).equals((Object)hoodieSourceOffset3) : hoodieSourceOffset3 != null)) {
            string = startOffset.commitTime();
        } else if (hoodieSourceOffset != null) {
            String commitTime = hoodieSourceOffset.commitTime();
            long time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime();
            string = HoodieActiveTimeline.formatDate(new Date(time + 1000L));
        } else {
            throw new IllegalStateException("UnKnow offset type.");
        }
        return string;
    }

    public void stop() {
    }

    public static final /* synthetic */ HoodieSourceOffset $anonfun$initialOffsets$1(HDFSMetadataLog metadataLog$1) {
        metadataLog$1.add(0L, (Object)HoodieSourceOffset$.MODULE$.INIT_OFFSET());
        return HoodieSourceOffset$.MODULE$.INIT_OFFSET();
    }

    public HoodieStreamSource(SQLContext sqlContext, String metadataPath, Option<StructType> schemaOption, Map<String, String> parameters) {
        this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$sqlContext = sqlContext;
        this.org$apache$spark$sql$hudi$streaming$HoodieStreamSource$$metadataPath = metadataPath;
        this.schemaOption = schemaOption;
        this.parameters = parameters;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        SparkAdapterSupport.$init$(this);
        this.hadoopConf = sqlContext.sparkSession().sessionState().newHadoopConf();
    }
}

