/*
 * Decompiled with CFR 0.152.
 */
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;

public class HoodieJavaStreamingApp {
    @Parameter(names={"--table-path", "-p"}, description="path for Hoodie sample table")
    private String tablePath = "/tmp/hoodie/streaming/sample-table";
    @Parameter(names={"--streaming-source-path", "-ssp"}, description="path for streaming source file folder")
    private String streamingSourcePath = "/tmp/hoodie/streaming/source";
    @Parameter(names={"--streaming-checkpointing-path", "-scp"}, description="path for streaming checking pointing folder")
    private String streamingCheckpointingPath = "/tmp/hoodie/streaming/checkpoint";
    @Parameter(names={"--streaming-duration-in-ms", "-sdm"}, description="time in millisecond for the streaming duration")
    private Long streamingDurationInMs = 15000L;
    @Parameter(names={"--table-name", "-n"}, description="table name for Hoodie sample table")
    private String tableName = "hoodie_test";
    @Parameter(names={"--table-type", "-t"}, description="One of COPY_ON_WRITE or MERGE_ON_READ")
    private String tableType = HoodieTableType.MERGE_ON_READ.name();
    @Parameter(names={"--hive-sync", "-hv"}, description="Enable syncing to hive")
    private Boolean enableHiveSync = false;
    @Parameter(names={"--hive-db", "-hd"}, description="hive database")
    private String hiveDB = "default";
    @Parameter(names={"--hive-table", "-ht"}, description="hive table")
    private String hiveTable = "hoodie_sample_test";
    @Parameter(names={"--hive-user", "-hu"}, description="hive username")
    private String hiveUser = "hive";
    @Parameter(names={"--hive-password", "-hp"}, description="hive password")
    private String hivePass = "hive";
    @Parameter(names={"--hive-url", "-hl"}, description="hive JDBC URL")
    private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";
    @Parameter(names={"--use-multi-partition-keys", "-mp"}, description="Use Multiple Partition Keys")
    private Boolean useMultiPartitionKeys = false;
    @Parameter(names={"--help", "-h"}, help=true)
    public Boolean help = false;
    private static final Logger LOG = LogManager.getLogger(HoodieJavaStreamingApp.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        HoodieJavaStreamingApp cli = new HoodieJavaStreamingApp();
        JCommander cmd = new JCommander((Object)cli, null, args);
        if (cli.help.booleanValue()) {
            cmd.usage();
            System.exit(1);
        }
        int errStatus = 0;
        try {
            cli.run();
        }
        catch (Exception ex) {
            LOG.error((Object)"Got error running app ", (Throwable)ex);
            errStatus = -1;
        }
        finally {
            System.exit(errStatus);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        SparkSession spark = SparkSession.builder().appName("Hoodie Spark Streaming APP").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
        JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
        FileSystem fs = FileSystem.get((Configuration)jssc.hadoopConfiguration());
        fs.delete(new Path(this.streamingSourcePath), true);
        fs.delete(new Path(this.streamingCheckpointingPath), true);
        fs.delete(new Path(this.tablePath), true);
        fs.mkdirs(new Path(this.streamingSourcePath));
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
        List records1 = RawTripTestPayload.recordsToStrings((List)dataGen.generateInserts("001", Integer.valueOf(100)));
        Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
        List records2 = RawTripTestPayload.recordsToStrings((List)dataGen.generateUpdatesForAllRecords("002"));
        Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2));
        String ckptPath = this.streamingCheckpointingPath + "/stream1";
        String srcPath = this.streamingSourcePath + "/stream1";
        fs.mkdirs(new Path(ckptPath));
        fs.mkdirs(new Path(srcPath));
        Dataset streamingInput = spark.readStream().schema(inputDF1.schema()).json(srcPath + "/*");
        ExecutorService executor = Executors.newFixedThreadPool(2);
        int numInitialCommits = 0;
        try {
            Future<Void> streamFuture = executor.submit(() -> {
                LOG.info((Object)"===== Streaming Starting =====");
                this.stream((Dataset<Row>)streamingInput, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL(), ckptPath);
                LOG.info((Object)"===== Streaming Ends =====");
                return null;
            });
            Future<Integer> showFuture = executor.submit(() -> {
                LOG.info((Object)"===== Showing Starting =====");
                int numCommits = this.addInputAndValidateIngestion(spark, fs, srcPath, 0, 100, (Dataset<Row>)inputDF1, (Dataset<Row>)inputDF2, true);
                LOG.info((Object)"===== Showing Ends =====");
                return numCommits;
            });
            streamFuture.get();
            numInitialCommits = showFuture.get();
        }
        finally {
            executor.shutdownNow();
        }
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(this.tablePath).build();
        if (this.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
            ValidationUtils.checkArgument((metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() == 1L ? 1 : 0) != 0);
        } else {
            ValidationUtils.checkArgument((metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() >= 1L ? 1 : 0) != 0);
        }
        spark.close();
        SparkSession newSpark = SparkSession.builder().appName("Hoodie Spark Streaming APP").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
        jssc = new JavaSparkContext(newSpark.sparkContext());
        String ckptPath2 = this.streamingCheckpointingPath + "/stream2";
        String srcPath2 = srcPath + "/stream2";
        fs.mkdirs(new Path(ckptPath2));
        fs.mkdirs(new Path(srcPath2));
        Dataset delStreamingInput = newSpark.readStream().schema(inputDF1.schema()).json(srcPath2 + "/*");
        List deletes = RawTripTestPayload.recordsToStrings((List)dataGen.generateUniqueUpdates("002", Integer.valueOf(20)));
        Dataset inputDF3 = newSpark.read().json(jssc.parallelize(deletes, 2));
        executor = Executors.newFixedThreadPool(2);
        try {
            Future<Void> streamFuture = executor.submit(() -> {
                LOG.info((Object)"===== Streaming Starting =====");
                this.stream((Dataset<Row>)delStreamingInput, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), ckptPath2);
                LOG.info((Object)"===== Streaming Ends =====");
                return null;
            });
            int numCommits = numInitialCommits;
            Future<Void> showFuture = executor.submit(() -> {
                LOG.info((Object)"===== Showing Starting =====");
                this.addInputAndValidateIngestion(newSpark, fs, srcPath2, numCommits, 80, (Dataset<Row>)inputDF3, null, false);
                LOG.info((Object)"===== Showing Ends =====");
                return null;
            });
            streamFuture.get();
            showFuture.get();
        }
        finally {
            executor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitTillNCommits(FileSystem fs, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        long timeoutMsecs = timeoutSecs * 1000;
        while (currTime - beginTime < timeoutMsecs) {
            try {
                HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions((FileSystem)fs, (String)this.tablePath);
                LOG.info((Object)("Timeline :" + timeline.getInstants().collect(Collectors.toList())));
                if (timeline.countInstants() >= numCommits) {
                    return;
                }
                HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(this.tablePath).setLoadActiveTimelineOnLoad(true).build();
                System.out.println("Instants :" + metaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
            }
            catch (TableNotFoundException te) {
                LOG.info((Object)"Got table not found exception. Retrying");
            }
            finally {
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        throw new IllegalStateException("Timedout waiting for " + numCommits + " commits to appear in " + this.tablePath);
    }

    public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath, int initialCommits, int expRecords, Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean instantTimeValidation) throws Exception {
        long numRecords;
        inputDF1.coalesce(1).write().mode(SaveMode.Append).json(srcPath);
        int numExpCommits = initialCommits + 1;
        this.waitTillNCommits(fs, numExpCommits, 180, 3);
        String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.tablePath);
        LOG.info((Object)("First commit at instant time :" + commitInstantTime1));
        String commitInstantTime2 = commitInstantTime1;
        if (null != inputDF2) {
            inputDF2.write().mode(SaveMode.Append).json(srcPath);
            Thread.sleep(3000L);
            this.waitTillNCommits(fs, ++numExpCommits, 180, 3);
            commitInstantTime2 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.tablePath);
            LOG.info((Object)("Second commit at instant time :" + commitInstantTime2));
        }
        if (this.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
            if (inputDF2 != null) {
                ++numExpCommits;
            }
            this.waitTillNCommits(fs, numExpCommits, 180, 3);
            commitInstantTime2 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.tablePath);
            LOG.info((Object)("Compaction commit at instant time :" + commitInstantTime2));
        }
        Dataset hoodieROViewDF = spark.read().format("hudi").load(this.tablePath + "/*/*/*/*");
        hoodieROViewDF.registerTempTable("hoodie_ro");
        spark.sql("describe hoodie_ro").show();
        spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
        if (instantTimeValidation) {
            System.out.println("Showing all records. Latest Instant Time =" + commitInstantTime2);
            spark.sql("select * from hoodie_ro").show(200, false);
            long numRecordsAtInstant2 = spark.sql("select * from hoodie_ro where _hoodie_commit_time = " + commitInstantTime2).count();
            ValidationUtils.checkArgument((numRecordsAtInstant2 == (long)expRecords ? 1 : 0) != 0, (String)("Expecting " + expRecords + " records, Got " + numRecordsAtInstant2));
        }
        ValidationUtils.checkArgument(((numRecords = spark.sql("select * from hoodie_ro").count()) == (long)expRecords ? 1 : 0) != 0, (String)("Expecting " + expRecords + " records, Got " + numRecords));
        if (this.tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
            Dataset hoodieIncViewDF = spark.read().format("hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), commitInstantTime1).load(this.tablePath);
            LOG.info((Object)("You will only see records from : " + commitInstantTime2));
            hoodieIncViewDF.groupBy(new Column[]{hoodieIncViewDF.col("_hoodie_commit_time")}).count().show();
        }
        return numExpCommits;
    }

    public void stream(Dataset<Row> streamingInput, String operationType, String checkpointLocation) throws Exception {
        DataStreamWriter writer = streamingInput.writeStream().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option("hoodie.delete.shuffle.parallelism", "2").option(DataSourceWriteOptions.OPERATION().key(), operationType).option(DataSourceWriteOptions.TABLE_TYPE().key(), this.tableType).option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key").option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition").option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp").option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true").option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true").option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false").option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH().key(), "true").option(HoodieWriteConfig.TBL_NAME.key(), this.tableName).option("checkpointLocation", checkpointLocation).outputMode(OutputMode.Append());
        this.updateHiveSyncConfig((DataStreamWriter<Row>)writer);
        StreamingQuery query = writer.trigger(Trigger.ProcessingTime((long)500L)).start(this.tablePath);
        query.awaitTermination(this.streamingDurationInMs.longValue());
    }

    private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row> writer) {
        if (this.enableHiveSync.booleanValue()) {
            LOG.info((Object)("Enabling Hive sync to " + this.hiveJdbcUrl));
            writer = writer.option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), this.hiveTable).option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), this.hiveDB).option(HiveSyncConfigHolder.HIVE_URL.key(), this.hiveJdbcUrl).option(HiveSyncConfigHolder.HIVE_USER.key(), this.hiveUser).option(HiveSyncConfigHolder.HIVE_PASS.key(), this.hivePass).option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true");
            writer = this.useMultiPartitionKeys != false ? writer.option(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "year,month,day").option(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getCanonicalName()) : writer.option(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "dateStr").option(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getCanonicalName());
        }
        return writer;
    }
}

