package org.apache.paimon.flink.action;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.fs.Path;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.shade.org.antlr.v4.runtime.tree.xpath.XPath;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/MergeIntoAction.class */
public class MergeIntoAction extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MergeIntoAction.class);
    private final List<String> primaryKeys;
    private final List<DataStructureConverter<Object, Object>> converters;
    private final List<String> targetFieldNames;

    @Nullable
    private String targetAlias;
    private String sourceTable;

    @Nullable
    private String[] sourceSqls;
    private String mergeCondition;
    private boolean matchedUpsert;
    private boolean notMatchedUpsert;
    private boolean matchedDelete;
    private boolean notMatchedDelete;
    private boolean insert;

    @Nullable
    private String matchedUpsertCondition;

    @Nullable
    private String matchedUpsertSet;

    @Nullable
    private String notMatchedBySourceUpsertCondition;

    @Nullable
    private String notMatchedBySourceUpsertSet;

    @Nullable
    private String matchedDeleteCondition;

    @Nullable
    private String notMatchedBySourceDeleteCondition;

    @Nullable
    private String notMatchedInsertCondition;

    @Nullable
    private String notMatchedInsertValues;

    MergeIntoAction(String str, String str2, String str3) {
        super(str, str2, str3);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports merge-into action. The table type is '%s'.", this.table.getClass().getName()));
        }
        changeIgnoreMergeEngine();
        this.primaryKeys = ((FileStoreTable) this.table).schema().primaryKeys();
        if (this.primaryKeys.isEmpty()) {
            throw new UnsupportedOperationException("merge-into action doesn't support table with no primary keys defined.");
        }
        this.converters = (List) this.table.rowType().getFieldTypes().stream().map(LogicalTypeConversion::toLogicalType).map(TypeConversions::fromLogicalToDataType).map(DataStructureConverters::getConverter).collect(Collectors.toList());
        this.targetFieldNames = (List) this.table.rowType().getFields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
    }

    public MergeIntoAction withTargetAlias(String str) {
        this.targetAlias = str;
        return this;
    }

    public MergeIntoAction withSourceTable(String str) {
        this.sourceTable = str;
        return this;
    }

    public MergeIntoAction withSourceSqls(String... strArr) {
        this.sourceSqls = strArr;
        return this;
    }

    public MergeIntoAction withMergeCondition(String str) {
        this.mergeCondition = str;
        return this;
    }

    public MergeIntoAction withMatchedUpsert(@Nullable String str, String str2) {
        this.matchedUpsert = true;
        this.matchedUpsertCondition = str;
        this.matchedUpsertSet = str2;
        return this;
    }

    public MergeIntoAction withNotMatchedBySourceUpsert(@Nullable String str, String str2) {
        this.notMatchedUpsert = true;
        this.notMatchedBySourceUpsertCondition = str;
        this.notMatchedBySourceUpsertSet = str2;
        return this;
    }

    public MergeIntoAction withMatchedDelete(@Nullable String str) {
        this.matchedDelete = true;
        this.matchedDeleteCondition = str;
        return this;
    }

    public MergeIntoAction withNotMatchedBySourceDelete(@Nullable String str) {
        this.notMatchedDelete = true;
        this.notMatchedBySourceDeleteCondition = str;
        return this;
    }

    public MergeIntoAction withNotMatchedInsert(@Nullable String str, String str2) {
        this.insert = true;
        this.notMatchedInsertCondition = str;
        this.notMatchedInsertValues = str2;
        return this;
    }

    public static Optional<Action> create(String[] strArr) {
        LOG.info("merge-into job args: {}", String.join(" ", strArr));
        MultipleParameterTool fromArgs = MultipleParameterTool.fromArgs(strArr);
        if (fromArgs.has("help")) {
            printHelp();
            return Optional.empty();
        }
        Tuple3<String, String, String> tablePath = Action.getTablePath(fromArgs);
        if (tablePath == null) {
            return Optional.empty();
        }
        MergeIntoAction mergeIntoAction = new MergeIntoAction((String) tablePath.f0, (String) tablePath.f1, (String) tablePath.f2);
        if (fromArgs.has("target-as")) {
            mergeIntoAction.withTargetAlias(fromArgs.get("target-as"));
        }
        if (fromArgs.has("source-sql")) {
            mergeIntoAction.withSourceSqls((String[]) fromArgs.getMultiParameter("source-sql").toArray(new String[0]));
        }
        if (argumentAbsent(fromArgs, "source-table")) {
            return Optional.empty();
        }
        mergeIntoAction.withSourceTable(fromArgs.get("source-table"));
        if (argumentAbsent(fromArgs, "on")) {
            return Optional.empty();
        }
        mergeIntoAction.withMergeCondition(fromArgs.get("on"));
        List list = (List) Arrays.stream(fromArgs.get("merge-actions").split(FieldListaggAgg.DELIMITER)).map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toList());
        if (list.contains("matched-upsert")) {
            if (argumentAbsent(fromArgs, "matched-upsert-set")) {
                return Optional.empty();
            }
            mergeIntoAction.withMatchedUpsert(fromArgs.get("matched-upsert-condition"), fromArgs.get("matched-upsert-set"));
        }
        if (list.contains("not-matched-by-source-upsert")) {
            if (argumentAbsent(fromArgs, "not-matched-by-source-upsert-set")) {
                return Optional.empty();
            }
            mergeIntoAction.withNotMatchedBySourceUpsert(fromArgs.get("not-matched-by-source-upsert-condition"), fromArgs.get("not-matched-by-source-upsert-set"));
        }
        if (list.contains("matched-delete")) {
            mergeIntoAction.withMatchedDelete(fromArgs.get("matched-delete-condition"));
        }
        if (list.contains("not-matched-by-source-delete")) {
            mergeIntoAction.withNotMatchedBySourceDelete(fromArgs.get("not-matched-by-source-delete-condition"));
        }
        if (list.contains("not-matched-insert")) {
            if (argumentAbsent(fromArgs, "not-matched-insert-values")) {
                return Optional.empty();
            }
            mergeIntoAction.withNotMatchedInsert(fromArgs.get("not-matched-insert-condition"), fromArgs.get("not-matched-insert-values"));
        }
        return !validate(mergeIntoAction) ? Optional.empty() : Optional.of(mergeIntoAction);
    }

    private static boolean argumentAbsent(MultipleParameterTool multipleParameterTool, String str) {
        if (multipleParameterTool.has(str)) {
            return false;
        }
        System.err.println(str + " is absent.\nRun <action> --help for help.");
        return true;
    }

    private static boolean validate(MergeIntoAction mergeIntoAction) {
        if (!mergeIntoAction.matchedUpsert && !mergeIntoAction.notMatchedUpsert && !mergeIntoAction.matchedDelete && !mergeIntoAction.notMatchedDelete && !mergeIntoAction.insert) {
            System.err.println("Must specify at least one merge action.\nRun <action> --help for help.");
            return false;
        }
        if (mergeIntoAction.matchedUpsert && mergeIntoAction.matchedDelete && (mergeIntoAction.matchedUpsertCondition == null || mergeIntoAction.matchedDeleteCondition == null)) {
            System.err.println("If both matched-upsert and matched-delete actions are present, their conditions must both be present too.\nRun <action> --help for help.");
            return false;
        }
        if (mergeIntoAction.notMatchedUpsert && mergeIntoAction.notMatchedDelete && (mergeIntoAction.notMatchedBySourceUpsertCondition == null || mergeIntoAction.notMatchedBySourceDeleteCondition == null)) {
            System.err.println("If both not-matched-by-source-upsert and not-matched-by--source-delete actions are present, their conditions must both be present too.\nRun <action> --help for help.");
            return false;
        }
        if (mergeIntoAction.notMatchedBySourceUpsertSet == null || !mergeIntoAction.notMatchedBySourceUpsertSet.equals(XPath.WILDCARD)) {
            return true;
        }
        System.err.println("The '*' cannot be used in not-matched-by-source-upsert-set");
        return false;
    }

    private static void printHelp() {
        System.out.println("Action \"merge-into\" simulates the \"MERGE INTO\" syntax.");
        System.out.println();
        System.out.println("Syntax:");
        System.out.println("  merge-into --warehouse <warehouse-path>\n             --database <database-name>\n             --table <target-table-name>\n             [--target-as <target-table-alias>]\n             [--source-sql <sql> ...]\n             --source-table <source-table-name>\n             --on <merge-condition>\n             --merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>\n             --matched-upsert-condition <matched-condition>\n             --matched-upsert-set <upsert-changes>\n             --matched-delete-condition <matched-condition>\n             --not-matched-insert-condition <not-matched-condition>\n             --not-matched-insert-values <insert-values>\n             --not-matched-by-source-upsert-condition <not-matched-by-source-condition>\n             --not-matched-by-source-upsert-set <not-matched-upsert-changes>\n             --not-matched-by-source-delete-condition <not-matched-by-source-condition>");
        System.out.println("  matched-upsert-changes format:");
        System.out.println("    col=<source-table>.col | expression [, ...] (do not add '<target-table>.' before 'col')");
        System.out.println("    * (upsert with all source cols; require target table's schema is equal to source's)");
        System.out.println("  not-matched-upsert-changes format:");
        System.out.println("    col=expression (cannot use source table's col)");
        System.out.println("  insert-values format:");
        System.out.println("    col1,col2,...,col_end (must specify values of all columns; can use <source-table>.col or expression)");
        System.out.println("    * (insert with all source cols; require target table's schema is equal to source's)");
        System.out.println("  not-matched-condition: cannot use target table's columns to construct condition expression.");
        System.out.println("  not-matched-by-source-condition: cannot use source table's columns to construct condition expression.");
        System.out.println("  alternative arguments:");
        System.out.println("    --path <table-path> to represent the table path.");
        System.out.println();
        System.out.println("Note: ");
        System.out.println("  1. Target table must has primary keys.");
        System.out.println("  2. All conditions, set changes and values should use Flink SQL syntax. Please quote them with \" to escape special characters.");
        System.out.println("  3. You can pass sqls by --source-sql to config environment and create source table at runtime");
        System.out.println("  4. Target alias cannot be duplicated with existed table name.");
        System.out.println("  5. If the source table is not in the current catalog and current database, the source-table-name must be qualified (database.table or catalog.database.table if in different catalog).");
        System.out.println("  6. At least one merge action must be specified.");
        System.out.println("  7. How to determine the changed rows with different \"matched\":");
        System.out.println("    matched: changed rows are from target table and each can match a source table row based on merge-condition and optional matched-condition.");
        System.out.println("    not-matched: changed rows are from source table and all rows cannot match any target table row based on merge-condition and optional not-matched-condition.");
        System.out.println("    not-matched-by-source: changed rows are from target table and all row cannot match any source table row based on merge-condition and optional not-matched-by-source-condition.");
        System.out.println("  8. If both matched-upsert and matched-delete actions are present, their conditions must both be present too (same to not-matched-by-source-upsert and not-matched-by-source-delete). Otherwise, all conditions are optional.");
        System.out.println();
        System.out.println("Examples:");
        System.out.println("  merge-into --path hdfs:///path/to/T\n             --source-table S\n             --on \"T.k = S.k\"\n             --merge-actions matched-upsert\n             --matched-upsert-condition \"T.v <> S.v\"\n             --matched-upsert-set \"v = S.v\"");
        System.out.println("  It will find matched rows of target table that meet condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        handleTargetAlias();
        handleSqls();
        List list = (List) Stream.of((Object[]) new Optional[]{getMatchedUpsertDataStream(), getNotMatchedUpsertDataStream(), getMatchedDeleteDataStream(), getNotMatchedDeleteDataStream(), getInsertDataStream()}).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
        batchSink(((DataStream) list.get(0)).union((DataStream[]) list.stream().skip(1L).toArray(i -> {
            return new DataStream[i];
        })));
    }

    private void handleTargetAlias() {
        if (this.targetAlias != null) {
            this.tEnv.createTemporaryView(escapedTargetName(), this.tEnv.from(this.identifier.getFullName()));
        }
    }

    private void handleSqls() {
        if (this.sourceSqls != null) {
            for (String str : this.sourceSqls) {
                try {
                    this.tEnv.executeSql(str).await();
                } catch (Throwable th) {
                    LOG.error(String.format("Error occurs when executing sql:\n%s", str), th);
                    throw new RuntimeException(String.format("Error occurs when executing sql:\n%s", str), th);
                }
            }
        }
    }

    private Optional<DataStream<RowData>> getMatchedUpsertDataStream() {
        List list;
        if (!this.matchedUpsert) {
            return Optional.empty();
        }
        if (this.matchedUpsertSet.equals(XPath.WILDCARD)) {
            String[] split = this.sourceTable.split("\\.");
            list = Collections.singletonList(split[split.length - 1] + ".*");
        } else {
            Map<String, String> parseKeyValues = Action.parseKeyValues(this.matchedUpsertSet);
            if (parseKeyValues == null) {
                throw new IllegalArgumentException("matched-upsert-set is invalid.\nRun <action> --help for help.");
            }
            for (String str : parseKeyValues.keySet()) {
                if (!this.targetFieldNames.contains(str)) {
                    throw new RuntimeException(String.format("Invalid column reference '%s' of table '%s' at matched-upsert action.", str, this.identifier.getFullName()));
                }
            }
            list = (List) this.targetFieldNames.stream().map(str2 -> {
                return (String) parseKeyValues.getOrDefault(str2, targetTableName() + Path.CUR_DIR + str2);
            }).collect(Collectors.toList());
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, list);
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.matchedUpsertCondition == null ? "" : "WHERE " + this.matchedUpsertCondition;
        String format = String.format("SELECT %s FROM %s INNER JOIN %s ON %s %s", objArr);
        LOG.info("Query used for matched-update:\n{}", format);
        Table sqlQuery = this.tEnv.sqlQuery(format);
        checkSchema("matched-upsert", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.UPDATE_AFTER, this.converters));
    }

    private Optional<DataStream<RowData>> getNotMatchedUpsertDataStream() {
        if (!this.notMatchedUpsert) {
            return Optional.empty();
        }
        Map<String, String> parseKeyValues = Action.parseKeyValues(this.notMatchedBySourceUpsertSet);
        if (parseKeyValues == null) {
            throw new IllegalArgumentException("matched-upsert-set is invalid.\nRun <action> --help for help.");
        }
        for (String str : parseKeyValues.keySet()) {
            if (!this.targetFieldNames.contains(str)) {
                throw new RuntimeException(String.format("Invalid column reference '%s' of table '%s' at not-matched-by-source-upsert action.\nRun <action> --help for help.", str, this.identifier.getFullName()));
            }
            if (this.primaryKeys.contains(str)) {
                throw new RuntimeException("Not allowed to change primary key in not-matched-by-source-upsert-set.\nRun <action> --help for help.");
            }
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, (List) this.targetFieldNames.stream().map(str2 -> {
            return (String) parseKeyValues.getOrDefault(str2, str2);
        }).collect(Collectors.toList()));
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.notMatchedBySourceUpsertCondition == null ? "" : String.format("AND (%s)", this.notMatchedBySourceUpsertCondition);
        String format = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", objArr);
        LOG.info("Query used for not-matched-by-source-upsert:\n{}", format);
        Table sqlQuery = this.tEnv.sqlQuery(format);
        checkSchema("not-matched-by-source-upsert", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.UPDATE_AFTER, this.converters));
    }

    private Optional<DataStream<RowData>> getMatchedDeleteDataStream() {
        if (!this.matchedDelete) {
            return Optional.empty();
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, (List) this.targetFieldNames.stream().map(str -> {
            return targetTableName() + Path.CUR_DIR + str;
        }).collect(Collectors.toList()));
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.matchedDeleteCondition == null ? "" : "WHERE " + this.matchedDeleteCondition;
        String format = String.format("SELECT %s FROM %s INNER JOIN %s ON %s %s", objArr);
        LOG.info("Query used by matched-delete:\n{}", format);
        Table sqlQuery = this.tEnv.sqlQuery(format);
        checkSchema("matched-delete", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.DELETE, this.converters));
    }

    private Optional<DataStream<RowData>> getNotMatchedDeleteDataStream() {
        if (!this.notMatchedDelete) {
            return Optional.empty();
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, this.targetFieldNames);
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.notMatchedBySourceDeleteCondition == null ? "" : String.format("AND (%s)", this.notMatchedBySourceDeleteCondition);
        String format = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", objArr);
        LOG.info("Query used by not-matched-by-source-delete:\n{}", format);
        Table sqlQuery = this.tEnv.sqlQuery(format);
        checkSchema("not-matched-by-source-delete", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.DELETE, this.converters));
    }

    private Optional<DataStream<RowData>> getInsertDataStream() {
        if (!this.insert) {
            return Optional.empty();
        }
        Object[] objArr = new Object[5];
        objArr[0] = this.notMatchedInsertValues;
        objArr[1] = escapedSourceName();
        objArr[2] = escapedTargetName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.notMatchedInsertCondition == null ? "" : String.format("AND (%s)", this.notMatchedInsertCondition);
        String format = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", objArr);
        LOG.info("Query used by not-matched-insert:\n{}", format);
        Table sqlQuery = this.tEnv.sqlQuery(format);
        checkSchema("not-matched-insert", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.INSERT, this.converters));
    }

    private void checkSchema(String str, Table table) {
        List<DataType> paimonTypes = toPaimonTypes(table.getResolvedSchema().getColumnDataTypes());
        List<DataType> fieldTypes = this.table.rowType().getFieldTypes();
        if (!compatibleCheck(paimonTypes, fieldTypes)) {
            throw new IllegalStateException(String.format("The schema of result in action '%s' is invalid.\nResult schema:   [%s]\nExpected schema: [%s]", str, paimonTypes.stream().map((v0) -> {
                return v0.asSQLString();
            }).collect(Collectors.joining(", ")), fieldTypes.stream().map((v0) -> {
                return v0.asSQLString();
            }).collect(Collectors.joining(", "))));
        }
    }

    private DataStream<RowData> toDataStream(Table table, RowKind rowKind, List<DataStructureConverter<Object, Object>> list) {
        return this.tEnv.toChangelogStream(table).map(row -> {
            int arity = row.getArity();
            GenericRowData genericRowData = new GenericRowData(rowKind, arity);
            for (int i = 0; i < arity; i++) {
                genericRowData.setField(i, ((DataStructureConverter) list.get(i)).toInternalOrNull(row.getField(i)));
            }
            return genericRowData;
        });
    }

    private String targetTableName() {
        return this.targetAlias == null ? this.identifier.getObjectName() : this.targetAlias;
    }

    private String escapedTargetName() {
        return String.format("`%s`.`%s`.`%s`", this.catalogName, this.identifier.getDatabaseName(), targetTableName());
    }

    private String escapedSourceName() {
        return (String) Arrays.stream(this.sourceTable.split("\\.")).map(str -> {
            return String.format("`%s`", str);
        }).collect(Collectors.joining(Path.CUR_DIR));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -415241085:
                if (implMethodName.equals("lambda$toDataStream$eb2d18fd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/MergeIntoAction") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/RowKind;Ljava/util/List;Lorg/apache/flink/types/Row;)Lorg/apache/flink/table/data/RowData;")) {
                    RowKind rowKind = (RowKind) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return row -> {
                        int arity = row.getArity();
                        GenericRowData genericRowData = new GenericRowData(rowKind, arity);
                        for (int i = 0; i < arity; i++) {
                            genericRowData.setField(i, ((DataStructureConverter) list.get(i)).toInternalOrNull(row.getField(i)));
                        }
                        return genericRowData;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
