package com.alibaba.ververica.connectors.common.table;

import com.alibaba.ververica.connectors.common.source.AbstractParallelSource;
import com.alibaba.ververica.connectors.common.source.resolver.ConverterOp;
import com.alibaba.ververica.connectors.common.source.resolver.RecordResolver;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/table/AbstractVervericaTableSource.class */
public abstract class AbstractVervericaTableSource<IN, OUT> implements StreamTableSource<OUT> {
    public static final Logger LOGGER = LoggerFactory.getLogger(AbstractVervericaTableSource.class);
    protected DescriptorProperties properties;

    public AbstractVervericaTableSource(DescriptorProperties descriptorProperties) {
        this.properties = descriptorProperties;
    }

    public TableSchema getTableSchema() {
        return TableSchemaUtils.getPhysicalSchema(this.properties.getTableSchema("schema"));
    }

    public DataType getProducedDataType() {
        TableSchema tableSchema = getTableSchema();
        DataTypes.Field[] fieldArr = new DataTypes.Field[tableSchema.getFieldDataTypes().length];
        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
        String[] fieldNames = tableSchema.getFieldNames();
        for (int i = 0; i < fieldNames.length; i++) {
            if (fieldDataTypes[i].getLogicalType().getTypeRoot().equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
                fieldArr[i] = DataTypes.FIELD(fieldNames[i], fieldDataTypes[i].bridgedTo(Timestamp.class));
            } else if (fieldDataTypes[i].equals(DataTypes.DATE())) {
                fieldArr[i] = DataTypes.FIELD(fieldNames[i], fieldDataTypes[i].bridgedTo(Date.class));
            } else {
                fieldArr[i] = DataTypes.FIELD(fieldNames[i], fieldDataTypes[i]);
            }
        }
        return DataTypes.ROW(fieldArr).bridgedTo(getOutputClass());
    }

    public DataStream<OUT> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        RecordResolver<IN, OUT> createRecordConverter = createRecordConverter();
        AbstractParallelSource createSourceFunction = createSourceFunction();
        DataStreamSource addSource = streamExecutionEnvironment.addSource(createSourceFunction);
        int parallelism = streamExecutionEnvironment.getParallelism();
        if (createSourceFunction instanceof AbstractParallelSource) {
            try {
                List<String> partitionList = createSourceFunction.getPartitionList();
                Preconditions.checkArgument(partitionList != null && partitionList.size() > 0, "No partition found.");
                int size = partitionList.size();
                if (parallelism > size) {
                    parallelism = size;
                }
                addSource.setParallelism(parallelism);
            } catch (Exception e) {
                LOGGER.error("Fail to get partition list from source function");
                if (!((Boolean) this.properties.getOptionalBoolean(VervericaTableOptions.IGNORE_PARTITION_FETCH_ERROR.key()).orElse(VervericaTableOptions.IGNORE_PARTITION_FETCH_ERROR.defaultValue())).booleanValue()) {
                    throw new RuntimeException("Fail to get partition list from source function", e);
                }
            }
        }
        return addSource.flatMap(new ConverterOp(createRecordConverter, createRecordConverter.getProducedType())).setParallelism(streamExecutionEnvironment.getParallelism());
    }

    public abstract RecordResolver<IN, OUT> createRecordConverter();

    public abstract SourceFunction<IN> createSourceFunction();

    public abstract boolean isBounded();

    public abstract Class<OUT> getOutputClass();
}
