/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class FlinkHoodieBackedTableMetadataWriter
extends HoodieBackedTableMetadataWriter {
    private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);

    public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
        return FlinkHoodieBackedTableMetadataWriter.create(conf, writeConfig, context, Option.empty());
    }

    public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<T> actionMetadata) {
        return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, (Option<String>)Option.empty());
    }

    public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<T> actionMetadata, Option<String> inFlightInstantTimestamp) {
        return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inFlightInstantTimestamp);
    }

    <T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inFlightInstantTimestamp) {
        super(hadoopConf, writeConfig, engineContext, actionMetadata, inFlightInstantTimestamp);
    }

    protected void initRegistry() {
        if (this.metadataWriteConfig.isMetricsOn()) {
            Registry registry = Registry.getRegistry((String)"HoodieMetadata");
            this.metrics = Option.of((Object)new HoodieMetadataMetrics(registry));
        } else {
            this.metrics = Option.empty();
        }
    }

    protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) {
        try {
            if (this.enabled) {
                this.bootstrapIfNeeded(engineContext, this.dataMetaClient, actionMetadata, inflightInstantTimestamp);
            }
        }
        catch (IOException e) {
            LOG.error((Object)"Failed to initialize metadata table. Disabling the writer.", (Throwable)e);
            this.enabled = false;
        }
    }

    protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
        ValidationUtils.checkState((boolean)this.enabled, (String)"Metadata table cannot be committed to as it is not enabled");
        List records = (List)hoodieDataRecords.get();
        List recordList = this.prepRecords(records, partitionName, 1);
        try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(this.engineContext, this.metadataWriteConfig);){
            if (!this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
                writeClient.startCommitWithTime(instantTime);
                this.metadataMetaClient.getActiveTimeline().transitionRequestedToInflight("deltacommit", instantTime);
            } else {
                HoodieInstant alreadyCompletedInstant = (HoodieInstant)this.metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
                HoodieActiveTimeline.deleteInstantFile((FileSystem)this.metadataMetaClient.getFs(), (String)this.metadataMetaClient.getMetaPath(), (HoodieInstant)alreadyCompletedInstant);
                this.metadataMetaClient.reloadActiveTimeline();
            }
            List<WriteStatus> statuses = records.size() > 0 ? writeClient.upsertPreppedRecords(recordList, instantTime) : Collections.emptyList();
            statuses.forEach(writeStatus -> {
                if (writeStatus.hasErrors()) {
                    throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
                }
            });
            writeClient.commit(instantTime, statuses, (Option<Map<String, String>>)Option.empty(), "deltacommit", Collections.emptyMap());
            this.metadataMetaClient.reloadActiveTimeline();
            if (canTriggerTableService) {
                this.compactIfNecessary(writeClient, instantTime);
                this.cleanIfNecessary(writeClient, instantTime);
                writeClient.archive();
            }
        }
        this.metrics.ifPresent(m -> m.updateSizeMetrics(this.metadataMetaClient, this.metadata));
    }

    private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
        List fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices((HoodieTableMetaClient)this.metadataMetaClient, (String)partitionName);
        ValidationUtils.checkArgument((fileSlices.size() == numFileGroups ? 1 : 0) != 0, (String)String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
        return records.stream().map(r -> {
            FileSlice slice = (FileSlice)fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex((String)r.getRecordKey(), (int)numFileGroups));
            String instantTime = slice.isEmpty() ? "I" : "U";
            r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId()));
            return r;
        }).collect(Collectors.toList());
    }
}

