/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.sinks.TableSinkUtils;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

public abstract class CommonExecSink
extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
    public static final String FIELD_NAME_DYNAMIC_TABLE_SINK = "dynamicTableSink";
    @JsonProperty(value="dynamicTableSink")
    protected final DynamicTableSinkSpec tableSinkSpec;
    @JsonIgnore
    private final ChangelogMode changelogMode;
    @JsonIgnore
    private final boolean isBounded;

    protected CommonExecSink(DynamicTableSinkSpec tableSinkSpec, ChangelogMode changelogMode, boolean isBounded, int id, List<InputProperty> inputProperties, LogicalType outputType, String description) {
        super(id, inputProperties, outputType, description);
        this.tableSinkSpec = tableSinkSpec;
        this.changelogMode = changelogMode;
        this.isBounded = isBounded;
    }

    public DynamicTableSinkSpec getTableSinkSpec() {
        return this.tableSinkSpec;
    }

    protected Transformation<Object> createSinkTransformation(StreamExecutionEnvironment env, TableConfig tableConfig, Transformation<RowData> inputTransform, int rowtimeFieldIndex) {
        DynamicTableSink tableSink = this.tableSinkSpec.getTableSink();
        DynamicTableSink.SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider((DynamicTableSink.Context)new SinkRuntimeProviderContext(this.isBounded));
        TableSchema tableSchema = this.tableSinkSpec.getCatalogTable().getSchema();
        inputTransform = this.applyNotNullEnforcer(tableConfig, tableSchema, inputTransform);
        if (runtimeProvider instanceof DataStreamSinkProvider) {
            if (runtimeProvider instanceof ParallelismProvider) {
                throw new TableException("`DataStreamSinkProvider` is not allowed to work with `ParallelismProvider`, please see document of `ParallelismProvider`");
            }
            DataStream dataStream = new DataStream(env, inputTransform);
            DataStreamSinkProvider provider = (DataStreamSinkProvider)runtimeProvider;
            return provider.consumeDataStream(dataStream).getTransformation();
        }
        if (runtimeProvider instanceof TransformationSinkProvider) {
            TransformationSinkProvider provider = (TransformationSinkProvider)runtimeProvider;
            return provider.createTransformation(TransformationSinkProvider.Context.of(inputTransform, rowtimeFieldIndex));
        }
        Preconditions.checkArgument((boolean)(runtimeProvider instanceof ParallelismProvider), (String)"%s should implement ParallelismProvider interface.", (Object[])new Object[]{runtimeProvider.getClass().getName()});
        int inputParallelism = inputTransform.getParallelism();
        int sinkParallelism = this.deriveSinkParallelism((ParallelismProvider)runtimeProvider, inputParallelism);
        inputTransform = this.applyKeyByForDifferentParallelism(tableSchema, inputTransform, inputParallelism, sinkParallelism);
        if (runtimeProvider instanceof SinkFunctionProvider) {
            SinkFunction sinkFunction = ((SinkFunctionProvider)runtimeProvider).createSinkFunction();
            return this.createSinkFunctionTransformation((SinkFunction<RowData>)sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism);
        }
        if (runtimeProvider instanceof OutputFormatProvider) {
            OutputFormat outputFormat = ((OutputFormatProvider)runtimeProvider).createOutputFormat();
            OutputFormatSinkFunction sinkFunction = new OutputFormatSinkFunction(outputFormat);
            return this.createSinkFunctionTransformation((SinkFunction<RowData>)sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism);
        }
        if (runtimeProvider instanceof SinkProvider) {
            return new SinkTransformation(inputTransform, ((SinkProvider)runtimeProvider).createSink(), this.getDescription(), sinkParallelism);
        }
        throw new TableException("This should not happen.");
    }

    private Transformation<RowData> applyNotNullEnforcer(TableConfig config, TableSchema tableSchema, Transformation<RowData> inputTransform) {
        ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = (ExecutionConfigOptions.NotNullEnforcer)config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
        int[] notNullFieldIndices = TableSinkUtils.getNotNullFieldIndices(tableSchema);
        String[] fieldNames = ((RowType)tableSchema.toPhysicalRowDataType().getLogicalType()).getFieldNames().toArray(new String[0]);
        if (notNullFieldIndices.length > 0) {
            SinkNotNullEnforcer enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames);
            List notNullFieldNames = Arrays.stream(notNullFieldIndices).mapToObj(idx -> fieldNames[idx]).collect(Collectors.toList());
            String operatorName = String.format("NotNullEnforcer(fields=[%s])", String.join((CharSequence)", ", notNullFieldNames));
            return new OneInputTransformation(inputTransform, operatorName, (OneInputStreamOperator)new StreamFilter((FilterFunction)enforcer), this.getInputTypeInfo(), inputTransform.getParallelism());
        }
        return inputTransform;
    }

    private int deriveSinkParallelism(ParallelismProvider parallelismProvider, int inputParallelism) {
        Optional parallelismOptional = parallelismProvider.getParallelism();
        if (parallelismOptional.isPresent()) {
            int sinkParallelism = (Integer)parallelismOptional.get();
            if (sinkParallelism <= 0) {
                throw new TableException(String.format("Table: %s configured sink parallelism: %s should not be less than zero or equal to zero", this.tableSinkSpec.getObjectIdentifier().asSummaryString(), sinkParallelism));
            }
            return sinkParallelism;
        }
        return inputParallelism;
    }

    private Transformation<RowData> applyKeyByForDifferentParallelism(TableSchema tableSchema, Transformation<RowData> inputTransform, int inputParallelism, int sinkParallelism) {
        int[] primaryKeys = TableSchemaUtils.getPrimaryKeyIndices((TableSchema)tableSchema);
        if (inputParallelism == sinkParallelism || this.changelogMode.containsOnly(RowKind.INSERT)) {
            return inputTransform;
        }
        if (primaryKeys.length == 0) {
            throw new TableException(String.format("Table: %s configured sink parallelism is: %s, while the input parallelism is: %s. Since configured parallelism is different from input parallelism and the changelog mode contains [%s], which is not INSERT_ONLY mode, primary key is required but no primary key is found", this.tableSinkSpec.getObjectIdentifier().asSummaryString(), sinkParallelism, inputParallelism, this.changelogMode.getContainedKinds().stream().map(Enum::toString).collect(Collectors.joining(","))));
        }
        RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(primaryKeys, this.getInputTypeInfo());
        KeyGroupStreamPartitioner partitioner = new KeyGroupStreamPartitioner((KeySelector)selector, 128);
        PartitionTransformation partitionedTransform = new PartitionTransformation(inputTransform, (StreamPartitioner)partitioner);
        partitionedTransform.setParallelism(sinkParallelism);
        return partitionedTransform;
    }

    private Transformation<Object> createSinkFunctionTransformation(SinkFunction<RowData> sinkFunction, StreamExecutionEnvironment env, Transformation<RowData> inputTransformation, int rowtimeFieldIndex, int sinkParallelism) {
        SinkOperator operator = new SinkOperator((SinkFunction)env.clean(sinkFunction), rowtimeFieldIndex);
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable)sinkFunction).setInputType(this.getInputTypeInfo(), env.getConfig());
        }
        return new LegacySinkTransformation(inputTransformation, this.getDescription(), (StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)operator), sinkParallelism);
    }

    private InternalTypeInfo<RowData> getInputTypeInfo() {
        return InternalTypeInfo.of((LogicalType)this.getInputEdges().get(0).getOutputType());
    }
}

