/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.schema;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.catalog.CatalogSchemaTable;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.sources.DynamicSourceUtils;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;

public final class CatalogSourceTable
extends FlinkPreparingTableBase {
    private final CatalogSchemaTable schemaTable;
    private final CatalogTable catalogTable;

    public CatalogSourceTable(RelOptSchema relOptSchema, List<String> names, RelDataType rowType, CatalogSchemaTable schemaTable, CatalogTable catalogTable) {
        super(relOptSchema, rowType, names, schemaTable.getStatistic());
        this.schemaTable = schemaTable;
        this.catalogTable = catalogTable;
    }

    @Override
    public RelNode toRel(RelOptTable.ToRelContext toRelContext) {
        RelOptCluster cluster = toRelContext.getCluster();
        List<RelHint> hints = toRelContext.getTableHints();
        FlinkContext context = ShortcutUtils.unwrapContext(cluster);
        FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster);
        FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, this.relOptSchema);
        Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints);
        CatalogTable catalogTable = this.createFinalCatalogTable(context, hintedOptions);
        DynamicTableSource tableSource = this.createDynamicTableSource(context, catalogTable);
        DynamicSourceUtils.prepareDynamicSource(this.schemaTable.getTableIdentifier(), catalogTable, tableSource, this.schemaTable.isStreamingMode(), context.getTableConfig());
        this.pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);
        TableSchema schema = catalogTable.getSchema();
        if (!TableSchemaUtils.containsPhysicalColumnsOnly((TableSchema)schema)) {
            this.pushMetadataProjection(relBuilder, typeFactory, schema);
            this.pushGeneratedProjection(context, relBuilder, schema);
        }
        if (this.schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) {
            this.pushWatermarkAssigner(context, relBuilder, schema);
        }
        return relBuilder.build();
    }

    private void pushWatermarkAssigner(FlinkContext context, FlinkRelBuilder relBuilder, TableSchema schema) {
        RelDataType inputRelDataType = relBuilder.peek().getRowType();
        SqlExprToRexConverterFactory factory = context.getSqlExprToRexConverterFactory();
        SqlExprToRexConverter converter = factory.create(inputRelDataType);
        WatermarkSpec watermarkSpec = (WatermarkSpec)schema.getWatermarkSpecs().get(0);
        String rowtimeColumn = watermarkSpec.getRowtimeAttribute();
        int rowtimeColumnIdx = inputRelDataType.getFieldNames().indexOf(rowtimeColumn);
        RexNode watermarkRexNode = converter.convertToRexNode(watermarkSpec.getWatermarkExpr());
        relBuilder.watermark(rowtimeColumnIdx, watermarkRexNode);
    }

    private void pushGeneratedProjection(FlinkContext context, FlinkRelBuilder relBuilder, TableSchema schema) {
        SqlExprToRexConverterFactory factory = context.getSqlExprToRexConverterFactory();
        SqlExprToRexConverter converter = factory.create(relBuilder.peek().getRowType());
        List projection = schema.getTableColumns().stream().map(c -> {
            if (c instanceof TableColumn.ComputedColumn) {
                TableColumn.ComputedColumn computedColumn = (TableColumn.ComputedColumn)c;
                return converter.convertToRexNode(computedColumn.getExpression());
            }
            return relBuilder.field(c.getName());
        }).collect(Collectors.toList());
        relBuilder.projectNamed(projection, Arrays.asList(schema.getFieldNames()), true);
    }

    private void pushMetadataProjection(FlinkRelBuilder relBuilder, FlinkTypeFactory typeFactory, TableSchema schema) {
        RexBuilder rexBuilder = relBuilder.getRexBuilder();
        List<String> fieldNames = schema.getTableColumns().stream().filter(c -> !(c instanceof TableColumn.ComputedColumn)).map(TableColumn::getName).collect(Collectors.toList());
        List fieldNodes = schema.getTableColumns().stream().filter(c -> !(c instanceof TableColumn.ComputedColumn)).map(c -> {
            RelDataType relDataType = typeFactory.createFieldTypeFromLogicalType(c.getType().getLogicalType());
            if (c instanceof TableColumn.MetadataColumn) {
                TableColumn.MetadataColumn metadataColumn = (TableColumn.MetadataColumn)c;
                String metadataKey = metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
                return rexBuilder.makeAbstractCast(relDataType, relBuilder.field(metadataKey));
            }
            return relBuilder.field(c.getName());
        }).collect(Collectors.toList());
        relBuilder.projectNamed(fieldNodes, fieldNames, true);
    }

    private void pushTableScan(FlinkRelBuilder relBuilder, RelOptCluster cluster, CatalogTable catalogTable, DynamicTableSource tableSource, FlinkTypeFactory typeFactory, List<RelHint> hints) {
        RowType producedType = DynamicSourceUtils.createProducedType(catalogTable.getSchema(), tableSource);
        RelDataType producedRelDataType = typeFactory.buildRelNodeRowType(producedType);
        TableSourceTable tableSourceTable = new TableSourceTable(this.relOptSchema, this.schemaTable.getTableIdentifier(), producedRelDataType, this.statistic, tableSource, this.schemaTable.isStreamingMode(), catalogTable, new String[0]);
        LogicalTableScan scan = LogicalTableScan.create(cluster, tableSourceTable, hints);
        relBuilder.push(scan);
    }

    private CatalogTable createFinalCatalogTable(FlinkContext context, Map<String, String> hintedOptions) {
        if (hintedOptions.isEmpty()) {
            return this.catalogTable;
        }
        Configuration config = context.getTableConfig().getConfiguration();
        if (!((Boolean)config.get(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED)).booleanValue()) {
            throw new ValidationException(String.format("The '%s' hint is allowed only when the config option '%s' is set to true.", "OPTIONS", TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key()));
        }
        return this.catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, this.catalogTable.getOptions()));
    }

    private DynamicTableSource createDynamicTableSource(FlinkContext context, CatalogTable catalogTable) {
        Configuration config = context.getTableConfig().getConfiguration();
        return FactoryUtil.createTableSource((Catalog)this.schemaTable.getCatalog(), (ObjectIdentifier)this.schemaTable.getTableIdentifier(), (CatalogTable)catalogTable, (ReadableConfig)config, (ClassLoader)Thread.currentThread().getContextClassLoader(), (boolean)this.schemaTable.isTemporary());
    }
}

