/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables;

public class CompactionPlanOperator
extends AbstractStreamOperator<CompactionPlanEvent>
implements OneInputStreamOperator<Object, CompactionPlanEvent> {
    private final Configuration conf;
    private transient HoodieFlinkTable table;

    public CompactionPlanOperator(Configuration conf) {
        this.conf = conf;
    }

    public void open() throws Exception {
        super.open();
        this.table = FlinkTables.createTable(this.conf, (RuntimeContext)this.getRuntimeContext());
        CompactionUtil.rollbackCompaction(this.table);
    }

    public void processElement(StreamRecord<Object> streamRecord) {
    }

    public void notifyCheckpointComplete(long checkpointId) {
        try {
            this.table.getMetaClient().reloadActiveTimeline();
            this.scheduleCompaction(this.table, checkpointId);
        }
        catch (Throwable throwable) {
            LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);
        }
    }

    private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
        Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
        if (!firstRequested.isPresent()) {
            LOG.info("No compaction plan for checkpoint " + checkpointId);
            return;
        }
        String compactionInstantTime = firstRequested.get().getTimestamp();
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime);
        if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            LOG.info("Empty compaction plan for instant " + compactionInstantTime);
        } else {
            HoodieInstant instant2 = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
            table.getActiveTimeline().transitionCompactionRequestedToInflight(instant2);
            table.getMetaClient().reloadActiveTimeline();
            List operations = compactionPlan.getOperations().stream().map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
            LOG.info("Execute compaction plan for instant {} as {} file groups", (Object)compactionInstantTime, (Object)operations.size());
            WriteMarkersFactory.get(table.getConfig().getMarkersType(), table, compactionInstantTime).deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
            for (CompactionOperation operation : operations) {
                this.output.collect((Object)new StreamRecord((Object)new CompactionPlanEvent(compactionInstantTime, operation)));
            }
        }
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
        this.output = output;
    }
}

