/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamerUtil {
    private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);

    public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
        TypedProperties properties = StreamerUtil.getProps(config);
        properties.put("bootstrap.servers", config.kafkaBootstrapServers);
        properties.put("group.id", config.kafkaGroupId);
        return properties;
    }

    public static TypedProperties getProps(FlinkStreamerConfig cfg) {
        if (cfg.propsFilePath.isEmpty()) {
            return new TypedProperties();
        }
        return StreamerUtil.readConfig(HadoopConfigurations.getHadoopConf(cfg), new Path(cfg.propsFilePath), cfg.configs).getProps();
    }

    public static Schema getSourceSchema(Configuration conf) {
        if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
            return new FilebasedSchemaProvider(conf).getSourceSchema();
        }
        if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
            String schemaStr = (String)conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA);
            return new Schema.Parser().parse(schemaStr);
        }
        String errorMsg = String.format("Either option '%s' or '%s' should be specified for avro schema deserialization", FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), FlinkOptions.SOURCE_AVRO_SCHEMA.key());
        throw new HoodieException(errorMsg);
    }

    public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Configuration hadoopConfig, Path cfgPath, List<String> overriddenProps) {
        DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
        try {
            if (!overriddenProps.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                conf.addPropsFromStream(new BufferedReader(new StringReader(String.join((CharSequence)"\n", overriddenProps))));
            }
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unexpected error adding config overrides", ioe);
        }
        return conf;
    }

    public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
        return HoodiePayloadConfig.newBuilder().withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)).withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)).withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)).build();
    }

    public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
        Configuration flatConf = FlinkOptions.flatOptions(conf);
        Properties properties = new Properties();
        flatConf.addAllToProperties(properties);
        for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
            if (flatConf.contains(option) || !option.hasDefaultValue()) continue;
            properties.put(option.key(), option.defaultValue());
        }
        return new TypedProperties(properties);
    }

    public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
        checkPropNames.forEach(prop -> Preconditions.checkState((boolean)props.containsKey(prop), (Object)("Required property " + prop + " is missing")));
    }

    public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
        return StreamerUtil.initTableIfNotExists(conf, HadoopConfigurations.getHadoopConf(conf));
    }

    public static HoodieTableMetaClient initTableIfNotExists(Configuration conf, org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
        String basePath = conf.getString(FlinkOptions.PATH);
        if (!StreamerUtil.tableExists(basePath, hadoopConf)) {
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder().setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)).setTableType(conf.getString(FlinkOptions.TABLE_TYPE)).setTableName(conf.getString(FlinkOptions.TABLE_NAME)).setDatabaseName(conf.getString(FlinkOptions.DATABASE_NAME)).setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)).setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)).setPreCombineField(OptionsResolver.getPreCombineField(conf)).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)).setKeyGeneratorClassProp(conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())).setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)).setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)).setCDCEnabled(conf.getBoolean(FlinkOptions.CDC_ENABLED)).setCDCSupplementalLoggingMode(conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE)).setTimelineLayoutVersion(1).initTable(hadoopConf, basePath);
            LOG.info("Table initialized under base path {}", (Object)basePath);
            return metaClient;
        }
        LOG.info("Table [{}/{}] already exists, no need to initialize the table", (Object)basePath, (Object)conf.getString(FlinkOptions.TABLE_NAME));
        return StreamerUtil.createMetaClient(basePath, hadoopConf);
    }

    public static boolean tableExists(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
        FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
        try {
            return fs.exists(new Path(basePath, ".hoodie"));
        }
        catch (IOException e) {
            throw new HoodieException("Error while checking whether table exists under path:" + basePath, e);
        }
    }

    public static boolean partitionExists(String tablePath, String partitionPath, org.apache.hadoop.conf.Configuration hadoopConf) {
        FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
        try {
            return fs.exists(new Path(tablePath, partitionPath));
        }
        catch (IOException e) {
            throw new HoodieException(String.format("Error while checking whether partition exists under table path [%s] and partition path [%s]", tablePath, partitionPath), e);
        }
    }

    public static String generateBucketKey(String partitionPath, String fileId) {
        return String.format("%s_%s", partitionPath, fileId);
    }

    public static HoodieTableMetaClient metaClientForReader(Configuration conf, org.apache.hadoop.conf.Configuration hadoopConf) {
        String basePath = conf.getString(FlinkOptions.PATH);
        if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && !StreamerUtil.tableExists(basePath, hadoopConf)) {
            return null;
        }
        return StreamerUtil.createMetaClient(basePath, hadoopConf);
    }

    public static HoodieTableMetaClient createMetaClient(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
        return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build();
    }

    public static HoodieTableMetaClient createMetaClient(Configuration conf) {
        return StreamerUtil.createMetaClient(conf.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(conf));
    }

    public static Option<HoodieTableConfig> getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
        FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
        Path metaPath = new Path(basePath, ".hoodie");
        try {
            if (fs.exists(metaPath)) {
                return Option.of(new HoodieTableConfig(fs, metaPath.toString(), null, null));
            }
        }
        catch (IOException e) {
            throw new HoodieIOException("Get table config error", e);
        }
        return Option.empty();
    }

    public static Option<String> medianInstantTime(String highVal, String lowVal) {
        try {
            long high = HoodieActiveTimeline.parseDateFromInstantTime(highVal).getTime();
            long low = HoodieActiveTimeline.parseDateFromInstantTime(lowVal).getTime();
            ValidationUtils.checkArgument(high > low, "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]");
            long median = low + (high - low) / 2L;
            String instantTime = HoodieActiveTimeline.formatDate(new Date(median));
            if (HoodieTimeline.compareTimestamps(lowVal, HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime) || HoodieTimeline.compareTimestamps(highVal, HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) {
                return Option.empty();
            }
            return Option.of(instantTime);
        }
        catch (ParseException e) {
            throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e);
        }
    }

    public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) {
        try {
            long newTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(newInstantTime).getTime();
            long oldTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(oldInstantTime).getTime();
            return (newTimestamp - oldTimestamp) / 1000L;
        }
        catch (ParseException e) {
            throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e);
        }
    }

    public static Option<Transformer> createTransformer(List<String> classNames) throws IOException {
        try {
            ArrayList<Transformer> transformers = new ArrayList<Transformer>();
            for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) {
                transformers.add((Transformer)ReflectionUtils.loadClass(className));
            }
            return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers));
        }
        catch (Throwable e) {
            throw new IOException("Could not load transformer class(es) " + classNames, e);
        }
    }

    public static boolean isValidFile(FileStatus fileStatus) {
        String extension = FSUtils.getFileExtension(fileStatus.getPath().toString());
        if (HoodieFileFormat.PARQUET.getFileExtension().equals(extension)) {
            return fileStatus.getLen() > (long)ParquetFileWriter.MAGIC.length;
        }
        if (HoodieFileFormat.ORC.getFileExtension().equals(extension)) {
            return fileStatus.getLen() > (long)"ORC".length();
        }
        if (HoodieFileFormat.HOODIE_LOG.getFileExtension().equals(extension)) {
            return fileStatus.getLen() > (long)HoodieLogFormat.MAGIC.length;
        }
        return fileStatus.getLen() > 0L;
    }

    public static String getLastPendingInstant(HoodieTableMetaClient metaClient) {
        return StreamerUtil.getLastPendingInstant(metaClient, true);
    }

    public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boolean reloadTimeline) {
        if (reloadTimeline) {
            metaClient.reloadActiveTimeline();
        }
        return metaClient.getCommitsTimeline().filterPendingExcludingCompaction().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
    }

    public static String getLastCompletedInstant(HoodieTableMetaClient metaClient) {
        return metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
    }

    public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
        return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
    }

    public static long getMaxCompactionMemoryInBytes(Configuration conf) {
        return (long)conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024L * 1024L;
    }

    public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
        TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
        return schemaUtil.getTableAvroSchema(includeMetadataFields);
    }

    public static Schema getLatestTableSchema(String path, org.apache.hadoop.conf.Configuration hadoopConf) {
        if (StringUtils.isNullOrEmpty(path) || !StreamerUtil.tableExists(path, hadoopConf)) {
            return null;
        }
        try {
            HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf);
            return StreamerUtil.getTableAvroSchema(metaClient, false);
        }
        catch (Exception e) {
            LOG.warn("Error while resolving the latest table schema", (Throwable)e);
            return null;
        }
    }

    public static boolean fileExists(FileSystem fs, Path path) {
        try {
            return fs.exists(path);
        }
        catch (IOException e) {
            throw new HoodieException("Exception while checking file " + path + " existence", e);
        }
    }

    public static String getAuxiliaryPath(Configuration conf) {
        return conf.getString(FlinkOptions.PATH) + "/" + ".hoodie/.aux";
    }
}

