/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusteringCommitSink
extends CleanFunction<ClusteringCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringCommitSink.class);
    private final Configuration conf;
    private transient HoodieFlinkTable<?> table;
    private transient Map<String, List<ClusteringCommitEvent>> commitBuffer;

    public ClusteringCommitSink(Configuration conf) {
        super(conf);
        this.conf = conf;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (this.writeClient == null) {
            this.writeClient = FlinkWriteClients.createWriteClient(this.conf, this.getRuntimeContext());
        }
        this.commitBuffer = new HashMap<String, List<ClusteringCommitEvent>>();
        this.table = this.writeClient.getHoodieTable();
    }

    public void invoke(ClusteringCommitEvent event, SinkFunction.Context context) throws Exception {
        String instant = event.getInstant();
        this.commitBuffer.computeIfAbsent(instant, k -> new ArrayList()).add(event);
        this.commitIfNecessary(instant, this.commitBuffer.get(instant));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitIfNecessary(String instant, List<ClusteringCommitEvent> events) {
        boolean isReady;
        HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(instant);
        Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(StreamerUtil.createMetaClient(this.conf), clusteringInstant);
        HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
        boolean bl = isReady = clusteringPlan.getInputGroups().size() == events.size();
        if (!isReady) {
            return;
        }
        if (events.stream().anyMatch(ClusteringCommitEvent::isFailed)) {
            try {
                ClusteringUtil.rollbackClustering(this.table, this.writeClient, instant);
            }
            finally {
                this.reset(instant);
            }
            return;
        }
        try {
            this.doCommit(instant, clusteringPlan, events);
        }
        catch (Throwable throwable) {
            LOG.error("Error while committing clustering instant: " + instant, throwable);
        }
        finally {
            this.reset(instant);
        }
    }

    private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List<ClusteringCommitEvent> events) {
        List statuses = events.stream().map(ClusteringCommitEvent::getWriteStatuses).flatMap(Collection::stream).collect(Collectors.toList());
        HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<List<WriteStatus>>();
        writeMetadata.setWriteStatuses(statuses);
        writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
        writeMetadata.setPartitionToReplaceFileIds(ClusteringCommitSink.getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
        ClusteringCommitSink.validateWriteResult(clusteringPlan, instant, writeMetadata);
        if (!writeMetadata.getCommitMetadata().isPresent()) {
            HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), Option.empty(), WriteOperationType.CLUSTER, this.writeClient.getConfig().getSchema(), "replacecommit");
            writeMetadata.setCommitMetadata(Option.of(commitMetadata));
        }
        this.table.getMetaClient().reloadActiveTimeline();
        this.writeClient.completeTableService(TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), this.table, instant);
        if (!this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
            LOG.info("Running inline clean");
            this.writeClient.clean();
        }
    }

    private void reset(String instant) {
        this.commitBuffer.remove(instant);
    }

    private static void validateWriteResult(HoodieClusteringPlan clusteringPlan, String instantTime, HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
        if (writeMetadata.getWriteStatuses().isEmpty()) {
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + " write statuses");
        }
    }

    private static Map<String, List<String>> getPartitionToReplacedFileIds(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
        Set newFilesWritten = writeMetadata.getWriteStats().get().stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
        return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).filter(fg -> !newFilesWritten.contains(fg)).collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
    }
}

