/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormatReader;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class AbstractHoodieLogRecordReader {
    private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordReader.class);
    protected final Schema readerSchema;
    private final String latestInstantTime;
    private final HoodieTableMetaClient hoodieTableMetaClient;
    private final String payloadClassFQN;
    private final String preCombineField;
    private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
    protected final List<String> logFilePaths;
    private final boolean readBlocksLazily;
    private final boolean reverseReader;
    private final int bufferSize;
    private final Option<InstantRange> instantRange;
    private final boolean withOperationField;
    private final FileSystem fs;
    private AtomicLong totalLogFiles = new AtomicLong(0L);
    private InternalSchema internalSchema;
    private final String path;
    private AtomicLong totalLogBlocks = new AtomicLong(0L);
    private AtomicLong totalLogRecords = new AtomicLong(0L);
    private AtomicLong totalRollbacks = new AtomicLong(0L);
    private AtomicLong totalCorruptBlocks = new AtomicLong(0L);
    private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<HoodieLogBlock>();
    protected final boolean forceFullScan;
    private int totalScannedLogFiles;
    private float progress = 0.0f;
    private Option<String> partitionName;
    private boolean populateMetaFields = true;

    protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange, boolean withOperationField) {
        this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema());
    }

    protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange, boolean withOperationField, boolean forceFullScan, Option<String> partitionName, InternalSchema internalSchema) {
        this.readerSchema = readerSchema;
        this.latestInstantTime = latestInstantTime;
        this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
        HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
        this.payloadClassFQN = tableConfig.getPayloadClass();
        this.preCombineField = tableConfig.getPreCombineField();
        this.totalLogFiles.addAndGet(logFilePaths.size());
        this.logFilePaths = logFilePaths;
        this.reverseReader = reverseReader;
        this.readBlocksLazily = readBlocksLazily;
        this.fs = fs;
        this.bufferSize = bufferSize;
        this.instantRange = instantRange;
        this.withOperationField = withOperationField;
        this.forceFullScan = forceFullScan;
        this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
        this.path = basePath;
        if (!tableConfig.populateMetaFields()) {
            this.populateMetaFields = false;
            this.simpleKeyGenFields = Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
        }
        this.partitionName = partitionName;
    }

    protected String getKeyField() {
        if (this.populateMetaFields) {
            return "_hoodie_record_key";
        }
        ValidationUtils.checkState(this.simpleKeyGenFields.isPresent());
        return this.simpleKeyGenFields.get().getKey();
    }

    public synchronized void scan() {
        this.scanInternal(Option.empty());
    }

    public synchronized void scan(List<String> keys) {
        this.scanInternal(Option.of(new KeySpec(keys, true)));
    }

    protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
        this.currentInstantLogBlocks = new ArrayDeque<HoodieLogBlock>();
        this.progress = 0.0f;
        this.totalLogFiles = new AtomicLong(0L);
        this.totalRollbacks = new AtomicLong(0L);
        this.totalCorruptBlocks = new AtomicLong(0L);
        this.totalLogBlocks = new AtomicLong(0L);
        this.totalLogRecords = new AtomicLong(0L);
        HoodieLogFormatReader logFormatReaderWrapper = null;
        HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline();
        HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
        HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
        try {
            String keyField = this.getKeyField();
            boolean enableRecordLookups = !this.forceFullScan;
            logFormatReaderWrapper = new HoodieLogFormatReader(this.fs, this.logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), this.readerSchema, this.readBlocksLazily, this.reverseReader, this.bufferSize, enableRecordLookups, keyField, this.internalSchema);
            HashSet<HoodieLogFile> scannedLogFiles = new HashSet<HoodieLogFile>();
            block19: while (logFormatReaderWrapper.hasNext()) {
                HoodieLogFile logFile2 = logFormatReaderWrapper.getLogFile();
                LOG.info((Object)("Scanning log file " + logFile2));
                scannedLogFiles.add(logFile2);
                this.totalLogFiles.set(scannedLogFiles.size());
                HoodieLogBlock logBlock = logFormatReaderWrapper.next();
                String instantTime = logBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME);
                this.totalLogBlocks.incrementAndGet();
                if (logBlock.getBlockType() != HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) break;
                if (logBlock.getBlockType() != HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK && logBlock.getBlockType() != HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK && (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime) || this.instantRange.isPresent() && !this.instantRange.get().isInRange(instantTime))) continue;
                switch (logBlock.getBlockType()) {
                    case HFILE_DATA_BLOCK: 
                    case AVRO_DATA_BLOCK: 
                    case PARQUET_DATA_BLOCK: {
                        LOG.info((Object)("Reading a data block from file " + logFile2.getPath() + " at instant " + logBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME)));
                        if (this.isNewInstantBlock(logBlock) && !this.readBlocksLazily) {
                            this.processQueuedBlocksForInstant(this.currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
                        }
                        this.currentInstantLogBlocks.push(logBlock);
                        continue block19;
                    }
                    case DELETE_BLOCK: {
                        LOG.info((Object)("Reading a delete block from file " + logFile2.getPath()));
                        if (this.isNewInstantBlock(logBlock) && !this.readBlocksLazily) {
                            this.processQueuedBlocksForInstant(this.currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
                        }
                        this.currentInstantLogBlocks.push(logBlock);
                        continue block19;
                    }
                    case COMMAND_BLOCK: {
                        LOG.info((Object)("Reading a command block from file " + logFile2.getPath()));
                        HoodieCommandBlock commandBlock = (HoodieCommandBlock)logBlock;
                        String targetInstantForCommandBlock = logBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
                        switch (commandBlock.getType()) {
                            case ROLLBACK_PREVIOUS_BLOCK: {
                                int numBlocksRolledBack = 0;
                                this.totalRollbacks.incrementAndGet();
                                while (!this.currentInstantLogBlocks.isEmpty()) {
                                    HoodieLogBlock lastBlock = this.currentInstantLogBlocks.peek();
                                    if (lastBlock.getBlockType() == HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK) {
                                        LOG.info((Object)("Rolling back the last corrupted log block read in " + logFile2.getPath()));
                                        this.currentInstantLogBlocks.pop();
                                        ++numBlocksRolledBack;
                                        continue;
                                    }
                                    if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME))) {
                                        LOG.info((Object)("Rolling back the last log block read in " + logFile2.getPath()));
                                        this.currentInstantLogBlocks.pop();
                                        ++numBlocksRolledBack;
                                        continue;
                                    }
                                    if (!targetInstantForCommandBlock.contentEquals(this.currentInstantLogBlocks.peek().getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME))) {
                                        LOG.warn((Object)("TargetInstantTime " + targetInstantForCommandBlock + " invalid or extra rollback command block in " + logFile2.getPath()));
                                        break;
                                    }
                                    LOG.warn((Object)("Unable to apply rollback command block in " + logFile2.getPath()));
                                }
                                LOG.info((Object)("Number of applied rollback blocks " + numBlocksRolledBack));
                                continue block19;
                            }
                        }
                        throw new UnsupportedOperationException("Command type not yet supported.");
                    }
                    case CORRUPT_BLOCK: {
                        LOG.info((Object)("Found a corrupt block in " + logFile2.getPath()));
                        this.totalCorruptBlocks.incrementAndGet();
                        this.currentInstantLogBlocks.push(logBlock);
                        continue block19;
                    }
                }
                throw new UnsupportedOperationException("Block type not supported yet");
            }
            if (!this.currentInstantLogBlocks.isEmpty()) {
                LOG.info((Object)"Merging the final data blocks");
                this.processQueuedBlocksForInstant(this.currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
            }
            this.progress = 1.0f;
        }
        catch (IOException e) {
            LOG.error((Object)"Got IOException when reading log file", (Throwable)e);
            throw new HoodieIOException("IOException when reading log file ", e);
        }
        catch (Exception e) {
            LOG.error((Object)"Got exception when reading log file", (Throwable)e);
            throw new HoodieException("Exception when reading log file ", e);
        }
        finally {
            try {
                if (null != logFormatReaderWrapper) {
                    logFormatReaderWrapper.close();
                }
            }
            catch (IOException ioe) {
                LOG.error((Object)"Unable to close log format reader", (Throwable)ioe);
            }
        }
    }

    private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
        return this.currentInstantLogBlocks.size() > 0 && this.currentInstantLogBlocks.peek().getBlockType() != HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK && !logBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME).contentEquals(this.currentInstantLogBlocks.peek().getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME));
    }

    private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
        try (ClosableIterator<IndexedRecord> recordIterator = this.getRecordsIterator(dataBlock, keySpecOpt);){
            Option<Schema> schemaOption = this.getMergedSchema(dataBlock);
            while (recordIterator.hasNext()) {
                IndexedRecord currentRecord = (IndexedRecord)recordIterator.next();
                IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), Collections.emptyMap()) : currentRecord;
                this.processNextRecord(this.createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
                this.totalLogRecords.incrementAndGet();
            }
        }
    }

    private Option<Schema> getMergedSchema(HoodieDataBlock dataBlock) {
        Option<Schema> result = Option.empty();
        if (!this.internalSchema.isEmptySchema()) {
            Long currentInstantTime = Long.parseLong(dataBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.INSTANT_TIME));
            InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime, this.hoodieTableMetaClient, false);
            Schema mergeSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(fileSchema, this.internalSchema, true, false).mergeSchema(), this.readerSchema.getName());
            result = Option.of(mergeSchema);
        }
        return result;
    }

    protected HoodieAvroRecord<?> createHoodieRecord(IndexedRecord rec, HoodieTableConfig hoodieTableConfig, String payloadClassFQN, String preCombineField, boolean withOperationField, Option<Pair<String, String>> simpleKeyGenFields, Option<String> partitionName) {
        if (this.populateMetaFields) {
            return (HoodieAvroRecord)SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord)rec, payloadClassFQN, preCombineField, withOperationField);
        }
        return (HoodieAvroRecord)SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord)rec, payloadClassFQN, preCombineField, simpleKeyGenFields.get(), withOperationField, partitionName);
    }

    protected abstract void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> var1) throws Exception;

    protected abstract void processNextDeletedRecord(DeleteRecord var1);

    private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen, Option<KeySpec> keySpecOpt) throws Exception {
        while (!logBlocks.isEmpty()) {
            LOG.info((Object)("Number of remaining logblocks to merge " + logBlocks.size()));
            HoodieLogBlock lastBlock = logBlocks.pollLast();
            switch (lastBlock.getBlockType()) {
                case AVRO_DATA_BLOCK: {
                    this.processDataBlock((HoodieAvroDataBlock)lastBlock, keySpecOpt);
                    break;
                }
                case HFILE_DATA_BLOCK: {
                    this.processDataBlock((HoodieHFileDataBlock)lastBlock, keySpecOpt);
                    break;
                }
                case PARQUET_DATA_BLOCK: {
                    this.processDataBlock((HoodieParquetDataBlock)lastBlock, keySpecOpt);
                    break;
                }
                case DELETE_BLOCK: {
                    Arrays.stream(((HoodieDeleteBlock)lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
                    break;
                }
                case CORRUPT_BLOCK: {
                    LOG.warn((Object)"Found a corrupt block which was not rolled back");
                    break;
                }
            }
        }
        this.progress = (numLogFilesSeen - 1) / this.logFilePaths.size();
    }

    private ClosableIterator<IndexedRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
        if (keySpecOpt.isPresent()) {
            KeySpec keySpec = keySpecOpt.get();
            return dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey);
        }
        return dataBlock.getRecordIterator();
    }

    public float getProgress() {
        return this.progress;
    }

    public long getTotalLogFiles() {
        return this.totalLogFiles.get();
    }

    public long getTotalLogRecords() {
        return this.totalLogRecords.get();
    }

    public long getTotalLogBlocks() {
        return this.totalLogBlocks.get();
    }

    protected String getPayloadClassFQN() {
        return this.payloadClassFQN;
    }

    public Option<String> getPartitionName() {
        return this.partitionName;
    }

    public long getTotalRollbacks() {
        return this.totalRollbacks.get();
    }

    public long getTotalCorruptBlocks() {
        return this.totalCorruptBlocks.get();
    }

    public boolean isWithOperationField() {
        return this.withOperationField;
    }

    public static abstract class Builder {
        public abstract Builder withFileSystem(FileSystem var1);

        public abstract Builder withBasePath(String var1);

        public abstract Builder withLogFilePaths(List<String> var1);

        public abstract Builder withReaderSchema(Schema var1);

        public abstract Builder withLatestInstantTime(String var1);

        public abstract Builder withReadBlocksLazily(boolean var1);

        public abstract Builder withReverseReader(boolean var1);

        public abstract Builder withBufferSize(int var1);

        public Builder withPartition(String partitionName) {
            throw new UnsupportedOperationException();
        }

        public Builder withInstantRange(Option<InstantRange> instantRange) {
            throw new UnsupportedOperationException();
        }

        public Builder withOperationField(boolean withOperationField) {
            throw new UnsupportedOperationException();
        }

        public abstract AbstractHoodieLogRecordReader build();
    }

    protected static class KeySpec {
        private final List<String> keys;
        private final boolean fullKey;

        public KeySpec(List<String> keys, boolean fullKey) {
            this.keys = keys;
            this.fullKey = fullKey;
        }
    }
}

