/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hive;

import com.uber.hoodie.hadoop.HoodieInputFormat;
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class HiveSyncTool
extends AbstractSyncTool
implements AutoCloseable {
    private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
    public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
    public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
    protected HiveSyncConfig hiveSyncConfig;
    protected AbstractHiveSyncHoodieClient hoodieHiveClient;
    protected String snapshotTableName = null;
    protected Option<String> roTableName = null;

    public HiveSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
        this(new HiveSyncConfig(props), new HiveConf(conf, HiveConf.class), fs);
    }

    public HiveSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
        super(hiveSyncConfig.getProps(), (Configuration)hiveConf, fs);
        if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
        }
        hiveConf.addResource(fs.getConf());
        this.initClient(hiveSyncConfig, hiveConf);
        this.initConfig(hiveSyncConfig);
    }

    protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
        try {
            this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, this.fs);
        }
        catch (RuntimeException e) {
            if (hiveSyncConfig.ignoreExceptions.booleanValue()) {
                LOG.error((Object)"Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", (Throwable)e);
            }
            throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
        }
    }

    private void initConfig(HiveSyncConfig hiveSyncConfig) {
        if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) {
            LOG.warn((Object)"Set partitionFields to empty, since the NonPartitionedExtractor is used");
            hiveSyncConfig.partitionFields = new ArrayList();
        }
        this.hiveSyncConfig = hiveSyncConfig;
        if (this.hoodieHiveClient != null) {
            switch (this.hoodieHiveClient.getTableType()) {
                case COPY_ON_WRITE: {
                    this.snapshotTableName = hiveSyncConfig.tableName;
                    this.roTableName = Option.empty();
                    break;
                }
                case MERGE_ON_READ: {
                    this.snapshotTableName = hiveSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
                    this.roTableName = hiveSyncConfig.skipROSuffix != false ? Option.of(hiveSyncConfig.tableName) : Option.of(hiveSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
                    break;
                }
                default: {
                    LOG.error((Object)("Unknown table type " + (Object)((Object)this.hoodieHiveClient.getTableType())));
                    throw new InvalidTableException(this.hoodieHiveClient.getBasePath());
                }
            }
        }
    }

    @Override
    public void syncHoodieTable() {
        try {
            if (this.hoodieHiveClient != null) {
                LOG.info((Object)("Syncing target hoodie table with hive table(" + this.hiveSyncConfig.tableName + "). Hive metastore URL :" + this.hiveSyncConfig.jdbcUrl + ", basePath :" + this.hiveSyncConfig.basePath));
                this.doSync();
            }
        }
        catch (RuntimeException re) {
            throw new HoodieException("Got runtime exception when hive syncing " + this.hiveSyncConfig.tableName, re);
        }
        finally {
            this.close();
        }
    }

    protected void doSync() {
        switch (this.hoodieHiveClient.getTableType()) {
            case COPY_ON_WRITE: {
                this.syncHoodieTable(this.snapshotTableName, false, false);
                break;
            }
            case MERGE_ON_READ: {
                this.syncHoodieTable(this.roTableName.get(), false, true);
                this.syncHoodieTable(this.snapshotTableName, true, false);
                break;
            }
            default: {
                LOG.error((Object)("Unknown table type " + (Object)((Object)this.hoodieHiveClient.getTableType())));
                throw new InvalidTableException(this.hoodieHiveClient.getBasePath());
            }
        }
    }

    @Override
    public void close() {
        if (this.hoodieHiveClient != null) {
            try {
                this.hoodieHiveClient.close();
            }
            catch (Exception e) {
                throw new HoodieHiveSyncException("Fail to close sync client.", e);
            }
        }
    }

    protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) {
        boolean meetSyncConditions;
        LOG.info((Object)("Trying to sync hoodie table " + tableName + " with base path " + this.hoodieHiveClient.getBasePath() + " of type " + (Object)((Object)this.hoodieHiveClient.getTableType())));
        if (this.hiveSyncConfig.autoCreateDatabase.booleanValue()) {
            try {
                if (!this.hoodieHiveClient.databaseExists(this.hiveSyncConfig.databaseName)) {
                    this.hoodieHiveClient.createDatabase(this.hiveSyncConfig.databaseName);
                }
            }
            catch (Exception e) {
                LOG.warn((Object)"Unable to create database", (Throwable)e);
            }
        } else if (!this.hoodieHiveClient.databaseExists(this.hiveSyncConfig.databaseName)) {
            LOG.error((Object)("Hive database does not exist " + this.hiveSyncConfig.databaseName));
            throw new HoodieHiveSyncException("hive database does not exist " + this.hiveSyncConfig.databaseName);
        }
        boolean tableExists = this.hoodieHiveClient.tableExists(tableName);
        boolean isDropPartition = this.hoodieHiveClient.isDropPartition();
        MessageType schema = this.hoodieHiveClient.getDataSchema();
        if (this.hoodieHiveClient.isBootstrap() && this.hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ && !readAsOptimized) {
            this.hiveSyncConfig.syncAsSparkDataSourceTable = false;
        }
        boolean schemaChanged = this.syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
        LOG.info((Object)("Schema sync complete. Syncing partitions for " + tableName));
        Option<Object> lastCommitTimeSynced = Option.empty();
        if (tableExists) {
            lastCommitTimeSynced = this.hoodieHiveClient.getLastCommitTimeSynced(tableName);
        }
        LOG.info((Object)("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")));
        List<String> writtenPartitionsSince = this.hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
        LOG.info((Object)("Storage partitions scan complete. Found " + writtenPartitionsSince.size()));
        boolean partitionsChanged = this.syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
        boolean bl = meetSyncConditions = schemaChanged || partitionsChanged;
        if (!this.hiveSyncConfig.isConditionalSync.booleanValue() || meetSyncConditions) {
            this.hoodieHiveClient.updateLastCommitTimeSynced(tableName);
        }
        LOG.info((Object)("Sync complete for " + tableName));
    }

    private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) {
        Map<String, String> tableProperties = ConfigUtils.toMap(this.hiveSyncConfig.tableProperties);
        Map<String, String> serdeProperties = ConfigUtils.toMap(this.hiveSyncConfig.serdeProperties);
        if (this.hiveSyncConfig.syncAsSparkDataSourceTable.booleanValue()) {
            Map<String, String> sparkTableProperties = this.getSparkTableProperties(this.hiveSyncConfig.sparkSchemaLengthThreshold, schema);
            Map<String, String> sparkSerdeProperties = this.getSparkSerdeProperties(readAsOptimized);
            tableProperties.putAll(sparkTableProperties);
            serdeProperties.putAll(sparkSerdeProperties);
        }
        boolean schemaChanged = false;
        if (!tableExists) {
            LOG.info((Object)("Hive table " + tableName + " is not found. Creating it"));
            HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(this.hiveSyncConfig.baseFileFormat.toUpperCase());
            String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
            if (baseFileFormat.equals((Object)HoodieFileFormat.PARQUET) && this.hiveSyncConfig.usePreApacheInputFormat.booleanValue()) {
                inputFormatClassName = useRealTimeInputFormat ? HoodieRealtimeInputFormat.class.getName() : HoodieInputFormat.class.getName();
            }
            String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
            String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
            this.hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties);
            schemaChanged = true;
        } else {
            Map<String, String> tableSchema = this.hoodieHiveClient.getTableSchema(tableName);
            SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, this.hiveSyncConfig.partitionFields, this.hiveSyncConfig.supportTimestamp);
            if (!schemaDiff.isEmpty()) {
                LOG.info((Object)("Schema difference found for " + tableName));
                this.hoodieHiveClient.updateTableDefinition(tableName, schema);
                if (this.hiveSyncConfig.tableProperties != null || this.hiveSyncConfig.syncAsSparkDataSourceTable.booleanValue()) {
                    this.hoodieHiveClient.updateTableProperties(tableName, tableProperties);
                    LOG.info((Object)("Sync table properties for " + tableName + ", table properties is: " + tableProperties));
                }
                schemaChanged = true;
            } else {
                LOG.info((Object)("No Schema difference for " + tableName));
            }
        }
        if (this.hiveSyncConfig.syncComment) {
            Schema avroSchemaWithoutMetadataFields = this.hoodieHiveClient.getAvroSchemaWithoutMetadataFields();
            Map<String, String> newComments = avroSchemaWithoutMetadataFields.getFields().stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
            boolean allEmpty = newComments.values().stream().allMatch(StringUtils::isNullOrEmpty);
            if (!allEmpty) {
                List<FieldSchema> hiveSchema = this.hoodieHiveClient.getTableCommentUsingMetastoreClient(tableName);
                this.hoodieHiveClient.updateTableComments(tableName, hiveSchema, avroSchemaWithoutMetadataFields.getFields());
            } else {
                LOG.info((Object)String.format("No comment %s need to add", tableName));
            }
        }
        return schemaChanged;
    }

    private Map<String, String> getSparkTableProperties(int schemaLengthThreshold, MessageType schema) {
        int i;
        GroupType originGroupType = schema.asGroupType();
        List partitionNames = this.hiveSyncConfig.partitionFields;
        ArrayList<PrimitiveType> partitionCols = new ArrayList<PrimitiveType>();
        ArrayList<Type> dataCols = new ArrayList<Type>();
        HashMap<String, Object> column2Field = new HashMap<String, Object>();
        for (Type field : originGroupType.getFields()) {
            column2Field.put(field.getName(), field);
        }
        for (String partitionName : partitionNames) {
            partitionCols.add(column2Field.getOrDefault(partitionName, new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, partitionName, OriginalType.UTF8)));
        }
        for (Type field : originGroupType.getFields()) {
            if (partitionNames.contains(field.getName())) continue;
            dataCols.add(field);
        }
        ArrayList<Object> reOrderedFields = new ArrayList<Object>();
        reOrderedFields.addAll(dataCols);
        reOrderedFields.addAll(partitionCols);
        GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields);
        HashMap<String, String> sparkProperties = new HashMap<String, String>();
        sparkProperties.put("spark.sql.sources.provider", "hudi");
        if (!StringUtils.isNullOrEmpty(this.hiveSyncConfig.sparkVersion)) {
            sparkProperties.put("spark.sql.create.version", this.hiveSyncConfig.sparkVersion);
        }
        String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
        int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
        sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
        for (i = 0; i < numSchemaPart; ++i) {
            int start = i * schemaLengthThreshold;
            int end = Math.min(start + schemaLengthThreshold, schemaString.length());
            sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
        }
        if (!partitionNames.isEmpty()) {
            sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
            for (i = 0; i < partitionNames.size(); ++i) {
                sparkProperties.put("spark.sql.sources.schema.partCol." + i, (String)partitionNames.get(i));
            }
        }
        return sparkProperties;
    }

    private Map<String, String> getSparkSerdeProperties(boolean readAsOptimized) {
        HashMap<String, String> sparkSerdeProperties = new HashMap<String, String>();
        sparkSerdeProperties.put("path", this.hiveSyncConfig.basePath);
        sparkSerdeProperties.put("hoodie.query.as.ro.table", String.valueOf(readAsOptimized));
        return sparkSerdeProperties;
    }

    private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
        boolean partitionsChanged;
        try {
            List<String> dropPartitions;
            List<String> updatePartitions;
            List<Partition> hivePartitions = this.hoodieHiveClient.getAllPartitions(tableName);
            List<AbstractSyncHoodieClient.PartitionEvent> partitionEvents = this.hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);
            List<String> newPartitions = this.filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.ADD);
            if (!newPartitions.isEmpty()) {
                LOG.info((Object)("New Partitions " + newPartitions));
                this.hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
            }
            if (!(updatePartitions = this.filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.UPDATE)).isEmpty()) {
                LOG.info((Object)("Changed Partitions " + updatePartitions));
                this.hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
            }
            if (!(dropPartitions = this.filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.DROP)).isEmpty()) {
                LOG.info((Object)("Drop Partitions " + dropPartitions));
                this.hoodieHiveClient.dropPartitions(tableName, dropPartitions);
            }
            partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty();
        }
        catch (Exception e) {
            throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
        }
        return partitionsChanged;
    }

    private List<String> filterPartitions(List<AbstractSyncHoodieClient.PartitionEvent> events, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType eventType) {
        return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition).collect(Collectors.toList());
    }

    public static void main(String[] args) {
        HiveSyncConfig cfg = new HiveSyncConfig();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
        HiveConf hiveConf = new HiveConf();
        hiveConf.addResource(fs.getConf());
        new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable();
    }
}

