/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.BaseHoodieTableServiceClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkTableServiceClient<T>
extends BaseHoodieTableServiceClient<List<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
    private HoodieBackedTableMetadataWriter metadataWriter;

    protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
        super(context, clientConfig);
    }

    @Override
    protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
        HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = this.getHoodieTable().compact(this.context, compactionInstantTime);
        this.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
        return compactionMetadata;
    }

    @Override
    public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
        this.completeCompaction(metadata, this.getHoodieTable(), compactionInstantTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + this.config.getTableName());
        List<HoodieWriteStat> writeStats = metadata.getWriteStats();
        HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
        try {
            this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
            this.finalizeWrite(table, compactionCommitTime, writeStats);
            this.writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
            LOG.info("Committing Compaction {} finished with result {}.", (Object)compactionCommitTime, (Object)metadata);
            CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
        }
        finally {
            this.txnManager.endTransaction(Option.of(compactionInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, compactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(), durationInMs, metadata, "compaction");
            }
            catch (ParseException e) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + compactionCommitTime, e);
            }
        }
        LOG.info("Compacted successfully on commit " + compactionCommitTime);
    }

    protected void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String clusteringCommitTime) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
        HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "replacecommit", clusteringCommitTime);
        List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> ((List)e.getValue()).stream()).collect(Collectors.toList());
        if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0L) {
            throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
        }
        try {
            this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
            this.finalizeWrite(table, clusteringCommitTime, writeStats);
            this.writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
            LOG.info("Committing Clustering {} finished with result {}.", (Object)clusteringCommitTime, (Object)metadata);
            table.getActiveTimeline().transitionReplaceInflightToComplete(HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IOException e2) {
            throw new HoodieClusteringException("Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e2);
        }
        finally {
            this.txnManager.endTransaction(Option.of(clusteringInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, clusteringCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.clusteringTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.clusteringTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), durationInMs, metadata, "replacecommit");
            }
            catch (ParseException e3) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + clusteringCommitTime, e3);
            }
        }
        LOG.info("Clustering successfully on commit " + clusteringCommitTime);
    }

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        return null;
    }

    @Override
    protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    public HoodieFlinkTable<?> getHoodieTable() {
        return HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
    }

    @Override
    public void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        try (HoodieBackedTableMetadataWriter metadataWriter = this.initMetadataWriter();){
            metadataWriter.update(metadata, instantTime, this.getHoodieTable().isTableServiceAction(actionType, instantTime));
        }
        catch (Exception e) {
            throw new HoodieException("Failed to update metadata", e);
        }
    }

    private HoodieBackedTableMetadataWriter initMetadataWriter() {
        return (HoodieBackedTableMetadataWriter)FlinkHoodieBackedTableMetadataWriter.create(FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
    }

    public void initMetadataTable() {
        HoodieFlinkTable<?> table = this.getHoodieTable();
        if (this.config.isMetadataTableEnabled()) {
            try {
                this.txnManager.getLockManager().lock();
                this.initMetadataWriter().close();
            }
            catch (Exception e) {
                throw new HoodieException("Failed to initialize metadata table", e);
            }
            finally {
                this.txnManager.getLockManager().unlock();
            }
            table.deleteMetadataIndexIfNecessary();
        } else {
            table.maybeDeleteMetadataTable();
        }
    }
}

