package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.class */
public abstract class CommonExecSink extends ExecNodeBase<Object> implements MultipleTransformationTranslator<Object> {
    public static final String FIELD_NAME_DYNAMIC_TABLE_SINK = "dynamicTableSink";

    @JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK)
    protected final DynamicTableSinkSpec tableSinkSpec;

    @JsonIgnore
    private final ChangelogMode changelogMode;

    @JsonIgnore
    private final boolean isBounded;

    /* JADX INFO: Access modifiers changed from: protected */
    public CommonExecSink(DynamicTableSinkSpec dynamicTableSinkSpec, ChangelogMode changelogMode, boolean z, int i, List<InputProperty> list, LogicalType logicalType, String str) {
        super(i, list, logicalType, str);
        this.tableSinkSpec = dynamicTableSinkSpec;
        this.changelogMode = changelogMode;
        this.isBounded = z;
    }

    public DynamicTableSinkSpec getTableSinkSpec() {
        return this.tableSinkSpec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Transformation<Object> createSinkTransformation(StreamExecutionEnvironment streamExecutionEnvironment, TableConfig tableConfig, Transformation<RowData> transformation, int i, boolean z) {
        DynamicTableSink tableSink = this.tableSinkSpec.getTableSink();
        ResolvedSchema resolvedSchema = this.tableSinkSpec.getCatalogTable().getResolvedSchema();
        DynamicTableSink.SinkRuntimeProvider sinkRuntimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(this.isBounded));
        RowType physicalRowType = getPhysicalRowType(resolvedSchema);
        int[] primaryKeyIndices = getPrimaryKeyIndices(physicalRowType, resolvedSchema);
        int deriveSinkParallelism = deriveSinkParallelism(transformation, sinkRuntimeProvider);
        Transformation<RowData> applyKeyBy = applyKeyBy(applyNotNullEnforcer(transformation, tableConfig, physicalRowType), primaryKeyIndices, deriveSinkParallelism, z);
        if (z) {
            applyKeyBy = applyUpsertMaterialize(applyKeyBy, primaryKeyIndices, deriveSinkParallelism, tableConfig, physicalRowType);
        }
        return applySinkProvider(applyKeyBy, streamExecutionEnvironment, sinkRuntimeProvider, i, deriveSinkParallelism);
    }

    private Transformation<RowData> applyNotNullEnforcer(Transformation<RowData> transformation, TableConfig tableConfig, RowType rowType) {
        ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = (ExecutionConfigOptions.NotNullEnforcer) tableConfig.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
        int[] notNullFieldIndices = getNotNullFieldIndices(rowType);
        String[] strArr = (String[]) rowType.getFieldNames().toArray(new String[0]);
        if (notNullFieldIndices.length <= 0) {
            return transformation;
        }
        return new OneInputTransformation(transformation, String.format("NotNullEnforcer(fields=[%s])", String.join(", ", (List) Arrays.stream(notNullFieldIndices).mapToObj(i -> {
            return strArr[i];
        }).collect(Collectors.toList()))), new StreamFilter(new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, strArr)), getInputTypeInfo(), transformation.getParallelism());
    }

    private int[] getNotNullFieldIndices(RowType rowType) {
        return IntStream.range(0, rowType.getFieldCount()).filter(i -> {
            return !rowType.getTypeAt(i).isNullable();
        }).toArray();
    }

    private int deriveSinkParallelism(Transformation<RowData> transformation, DynamicTableSink.SinkRuntimeProvider sinkRuntimeProvider) {
        int parallelism = transformation.getParallelism();
        return !(sinkRuntimeProvider instanceof ParallelismProvider) ? parallelism : ((Integer) ((ParallelismProvider) sinkRuntimeProvider).getParallelism().map(num -> {
            if (num.intValue() <= 0) {
                throw new TableException(String.format("Invalid configured parallelism %s for table '%s'.", num, this.tableSinkSpec.getObjectIdentifier().asSummaryString()));
            }
            return num;
        }).orElse(Integer.valueOf(parallelism))).intValue();
    }

    private Transformation<RowData> applyKeyBy(Transformation<RowData> transformation, int[] iArr, int i, boolean z) {
        int parallelism = transformation.getParallelism();
        if ((parallelism == i || this.changelogMode.containsOnly(RowKind.INSERT)) && !z) {
            return transformation;
        }
        if (iArr.length == 0) {
            throw new TableException(String.format("The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. Since the configured parallelism is different from the input's parallelism and the changelog mode is not insert-only, a primary key is required but could not be found.", this.tableSinkSpec.getObjectIdentifier().asSummaryString(), Integer.valueOf(i), Integer.valueOf(parallelism)));
        }
        PartitionTransformation partitionTransformation = new PartitionTransformation(transformation, new KeyGroupStreamPartitioner(KeySelectorUtil.getRowDataSelector(iArr, getInputTypeInfo()), 128));
        partitionTransformation.setParallelism(i);
        return partitionTransformation;
    }

    private Transformation<RowData> applyUpsertMaterialize(Transformation<RowData> transformation, int[] iArr, int i, TableConfig tableConfig, RowType rowType) {
        OneInputTransformation oneInputTransformation = new OneInputTransformation(transformation, "SinkMaterializer", new SinkUpsertMaterializer(StateConfigUtil.createTtlConfig(tableConfig.getIdleStateRetention().toMillis()), InternalSerializers.create(rowType), new EqualiserCodeGenerator(rowType).generateRecordEqualiser("SinkMaterializeEqualiser")), transformation.getOutputType(), i);
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(iArr, InternalTypeInfo.of(rowType));
        oneInputTransformation.setStateKeySelector(rowDataSelector);
        oneInputTransformation.setStateKeyType(rowDataSelector.mo5591getProducedType());
        return oneInputTransformation;
    }

    private Transformation<?> applySinkProvider(Transformation<RowData> transformation, StreamExecutionEnvironment streamExecutionEnvironment, DynamicTableSink.SinkRuntimeProvider sinkRuntimeProvider, int i, int i2) {
        if (sinkRuntimeProvider instanceof DataStreamSinkProvider) {
            return ((DataStreamSinkProvider) sinkRuntimeProvider).consumeDataStream(new DataStream<>(streamExecutionEnvironment, transformation)).getTransformation();
        }
        if (sinkRuntimeProvider instanceof TransformationSinkProvider) {
            return ((TransformationSinkProvider) sinkRuntimeProvider).createTransformation(TransformationSinkProvider.Context.of(transformation, i));
        }
        if (sinkRuntimeProvider instanceof SinkFunctionProvider) {
            return createSinkFunctionTransformation(((SinkFunctionProvider) sinkRuntimeProvider).createSinkFunction(), streamExecutionEnvironment, transformation, i, i2);
        }
        if (sinkRuntimeProvider instanceof OutputFormatProvider) {
            return createSinkFunctionTransformation(new OutputFormatSinkFunction(((OutputFormatProvider) sinkRuntimeProvider).createOutputFormat()), streamExecutionEnvironment, transformation, i, i2);
        }
        if (sinkRuntimeProvider instanceof SinkProvider) {
            return new SinkTransformation(transformation, ((SinkProvider) sinkRuntimeProvider).createSink(), getDescription(), i2);
        }
        throw new TableException("Unsupported sink runtime provider.");
    }

    private Transformation<?> createSinkFunctionTransformation(SinkFunction<RowData> sinkFunction, StreamExecutionEnvironment streamExecutionEnvironment, Transformation<RowData> transformation, int i, int i2) {
        SinkOperator sinkOperator = new SinkOperator((SinkFunction) streamExecutionEnvironment.clean(sinkFunction), i);
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getInputTypeInfo(), streamExecutionEnvironment.getConfig());
        }
        return new LegacySinkTransformation(transformation, getDescription(), SimpleOperatorFactory.of(sinkOperator), i2);
    }

    private InternalTypeInfo<RowData> getInputTypeInfo() {
        return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
    }

    private int[] getPrimaryKeyIndices(RowType rowType, ResolvedSchema resolvedSchema) {
        return (int[]) resolvedSchema.getPrimaryKey().map(uniqueConstraint -> {
            Stream<String> stream = uniqueConstraint.getColumns().stream();
            rowType.getClass();
            return stream.mapToInt(rowType::getFieldIndex).toArray();
        }).orElse(new int[0]);
    }

    private RowType getPhysicalRowType(ResolvedSchema resolvedSchema) {
        return (RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType();
    }
}
