/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactFunction
extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);
    private final Configuration conf;
    private transient HoodieFlinkWriteClient<?> writeClient;
    private final boolean asyncCompaction;
    private int taskID;
    private transient NonThrownExecutor executor;

    public CompactFunction(Configuration conf) {
        this.conf = conf;
        this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
    }

    public void open(Configuration parameters) throws Exception {
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = StreamerUtil.createWriteClient(this.conf, this.getRuntimeContext());
        if (this.asyncCompaction) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
    }

    public void processElement(CompactionPlanEvent event, ProcessFunction.Context context, Collector<CompactionCommitEvent> collector) throws Exception {
        String instantTime = event.getCompactionInstantTime();
        CompactionOperation compactionOperation = event.getOperation();
        if (this.asyncCompaction) {
            this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.doCompaction(instantTime, compactionOperation, collector, this.reloadWriteConfig())), (errMsg, t) -> collector.collect((Object)new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), this.taskID)), "Execute compaction for instant %s from task %d", instantTime, this.taskID);
        } else {
            LOG.info("Execute compaction for instant {} from task {}", (Object)instantTime, (Object)this.taskID);
            this.doCompaction(instantTime, compactionOperation, collector, this.writeClient.getConfig());
        }
    }

    private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector, HoodieWriteConfig writeConfig) throws IOException {
        HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
        List<WriteStatus> writeStatuses = compactor.compact(new HoodieFlinkCopyOnWriteTable(writeConfig, this.writeClient.getEngineContext(), this.writeClient.getHoodieTable().getMetaClient()), this.writeClient.getHoodieTable().getMetaClient(), this.writeClient.getConfig(), compactionOperation, instantTime, this.writeClient.getHoodieTable().getTaskContextSupplier());
        collector.collect((Object)new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, this.taskID));
    }

    private HoodieWriteConfig reloadWriteConfig() throws Exception {
        HoodieWriteConfig writeConfig = this.writeClient.getConfig();
        CompactionUtil.setAvroSchema(writeConfig, this.writeClient.getHoodieTable().getMetaClient());
        return writeConfig;
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor executor) {
        this.executor = executor;
    }
}

