/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactionCommitSink
extends CleanFunction<CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class);
    private final Configuration conf;
    private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
    private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
    private transient HoodieFlinkTable<?> table;

    public CompactionCommitSink(Configuration conf) {
        super(conf);
        this.conf = conf;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (this.writeClient == null) {
            this.writeClient = StreamerUtil.createWriteClient(this.conf, this.getRuntimeContext());
        }
        this.commitBuffer = new HashMap<String, Map<String, CompactionCommitEvent>>();
        this.compactionPlanCache = new HashMap<String, HoodieCompactionPlan>();
        this.table = this.writeClient.getHoodieTable();
    }

    public void invoke(CompactionCommitEvent event, SinkFunction.Context context) throws Exception {
        String instant = event.getInstant();
        this.commitBuffer.computeIfAbsent(instant, k -> new HashMap()).put(event.getFileId(), event);
        this.commitIfNecessary(instant, this.commitBuffer.get(instant).values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException {
        boolean isReady;
        HoodieCompactionPlan compactionPlan = this.compactionPlanCache.computeIfAbsent(instant, k -> {
            try {
                return CompactionUtils.getCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(), instant);
            }
            catch (IOException e) {
                throw new HoodieException(e);
            }
        });
        boolean bl = isReady = compactionPlan.getOperations().size() == events.size();
        if (!isReady) {
            return;
        }
        if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
            try {
                CompactionUtil.rollbackCompaction(this.table, instant);
            }
            finally {
                this.reset(instant);
                return;
            }
        }
        try {
            this.doCommit(instant, events);
        }
        catch (Throwable throwable) {
            LOG.error("Error while committing compaction instant: " + instant, throwable);
        }
        finally {
            this.reset(instant);
        }
    }

    private void doCommit(String instant, Collection<CompactionCommitEvent> events) throws IOException {
        List statuses = events.stream().map(CompactionCommitEvent::getWriteStatuses).flatMap(Collection::stream).collect(Collectors.toList());
        HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(this.table, instant, HoodieList.of(statuses), this.writeClient.getConfig().getSchema());
        this.writeClient.commitCompaction(instant, metadata, Option.empty());
        if (!this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
            this.writeClient.clean();
        }
    }

    private void reset(String instant) {
        this.commitBuffer.remove(instant);
        this.compactionPlanCache.remove(instant);
    }
}

