/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

public class BatchExecDynamicFilteringDataCollector
extends ExecNodeBase<Object>
implements BatchExecNode<Object> {
    @Experimental
    private static final ConfigOption<MemorySize> TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD = ConfigOptions.key((String)"table.exec.dynamic-filtering.threshold").memoryType().defaultValue((Object)MemorySize.parse((String)"8 mb")).withDescription("If the collector collects more data than the threshold (default is 8M), an empty DynamicFilterEvent with a flag only will be sent to Coordinator, which could avoid exceeding the akka limit and out-of-memory (see " + AkkaOptions.FRAMESIZE.key() + "). Otherwise a DynamicFilterEvent with all deduplicated records will be sent to Coordinator.");
    private final List<Integer> dynamicFilteringFieldIndices;

    public BatchExecDynamicFilteringDataCollector(List<Integer> dynamicFilteringFieldIndices, ReadableConfig tableConfig, InputProperty inputProperty, RowType outputType, String description) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecTableSourceScan.class), ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig), Collections.singletonList(inputProperty), (LogicalType)outputType, description);
        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
        Preconditions.checkArgument((outputType.getFieldCount() == dynamicFilteringFieldIndices.size() ? 1 : 0) != 0);
    }

    @Override
    protected Transformation<Object> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        DynamicFilteringDataCollectorOperatorFactory factory = new DynamicFilteringDataCollectorOperatorFactory((RowType)this.getOutputType(), this.dynamicFilteringFieldIndices, config.get(TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD).getBytes());
        return ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationName(config), this.createTransformationDescription(config), factory, InternalTypeInfo.of((LogicalType)this.getOutputType()), 1);
    }
}

