/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.bloom.BaseHoodieBloomIndexHelper;
import org.apache.hudi.index.bloom.BloomIndexFileInfo;
import org.apache.hudi.index.bloom.BucketizedBloomCheckPartitioner;
import org.apache.hudi.index.bloom.HoodieBloomIndexCheckFunction;
import org.apache.hudi.index.bloom.HoodieMetadataBloomIndexCheckFunction;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkHoodieBloomIndexHelper
extends BaseHoodieBloomIndexHelper {
    private static final Logger LOG = LogManager.getLogger(SparkHoodieBloomIndexHelper.class);
    private static final SparkHoodieBloomIndexHelper SINGLETON_INSTANCE = new SparkHoodieBloomIndexHelper();

    private SparkHoodieBloomIndexHelper() {
    }

    public static SparkHoodieBloomIndexHelper getInstance() {
        return SINGLETON_INSTANCE;
    }

    public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData<String, String> partitionRecordKeyPairs, HoodieData<Pair<String, HoodieKey>> fileComparisonPairs, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
        JavaRDD keyLookupResultRDD;
        JavaRDD fileComparisonsRDD = HoodieJavaRDD.getJavaRDD(fileComparisonPairs).map((Function & Serializable)pair -> new Tuple2(pair.getLeft(), pair.getRight()));
        int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
        int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
        LOG.info((Object)("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" + config.getBloomIndexParallelism() + "}"));
        if (config.getBloomIndexUseMetadata() && HoodieTableMetadataUtil.getCompletedMetadataPartitions((HoodieTableConfig)hoodieTable.getMetaClient().getTableConfig()).contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) {
            JavaRDD sortedFileIdAndKeyPairs = fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
            keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex((Function2)new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true);
        } else if (config.useBloomIndexBucketizedChecking()) {
            Map<String, Long> comparisonsPerFileGroup = this.computeComparisonsPerFileGroup(config, recordsPerPartition, partitionToFileInfo, (JavaRDD<Tuple2<String, HoodieKey>>)fileComparisonsRDD, context);
            BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(joinParallelism, comparisonsPerFileGroup, config.getBloomIndexKeysPerBucket());
            keyLookupResultRDD = fileComparisonsRDD.mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)Pair.of((Object)t._1, (Object)((HoodieKey)t._2).getRecordKey()), t)).repartitionAndSortWithinPartitions((Partitioner)partitioner).map(Tuple2::_2).mapPartitionsWithIndex((Function2)new HoodieBloomIndexCheckFunction(hoodieTable, config), true);
        } else {
            keyLookupResultRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism).mapPartitionsWithIndex((Function2)new HoodieBloomIndexCheckFunction(hoodieTable, config), true);
        }
        return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator).filter((Function & Serializable)lr -> lr.getMatchingRecordKeys().size() > 0).flatMapToPair((PairFlatMapFunction & Serializable)lookupResult -> lookupResult.getMatchingRecordKeys().stream().map(recordKey -> new Tuple2((Object)new HoodieKey(recordKey, lookupResult.getPartitionPath()), (Object)new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()))).collect(Collectors.toList()).iterator()));
    }

    private Map<String, Long> computeComparisonsPerFileGroup(HoodieWriteConfig config, Map<String, Long> recordsPerPartition, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD, HoodieEngineContext context) {
        Map<String, Long> fileToComparisons;
        if (config.getBloomIndexPruneByRanges()) {
            context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName());
            fileToComparisons = fileComparisonsRDD.mapToPair((PairFunction & Serializable)t -> t).countByKey();
        } else {
            fileToComparisons = new HashMap<String, Long>();
            partitionToFileInfo.forEach((key, value) -> {
                for (BloomIndexFileInfo fileInfo : value) {
                    fileToComparisons.put(fileInfo.getFileId(), (Long)recordsPerPartition.get(key));
                }
            });
        }
        return fileToComparisons;
    }
}

