/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.aws.sync;

import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.AWSGlueClientBuilder;
import com.amazonaws.services.glue.model.AlreadyExistsException;
import com.amazonaws.services.glue.model.BatchCreatePartitionRequest;
import com.amazonaws.services.glue.model.BatchCreatePartitionResult;
import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest;
import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry;
import com.amazonaws.services.glue.model.BatchUpdatePartitionResult;
import com.amazonaws.services.glue.model.Column;
import com.amazonaws.services.glue.model.CreateDatabaseRequest;
import com.amazonaws.services.glue.model.CreateDatabaseResult;
import com.amazonaws.services.glue.model.CreateTableRequest;
import com.amazonaws.services.glue.model.CreateTableResult;
import com.amazonaws.services.glue.model.DatabaseInput;
import com.amazonaws.services.glue.model.EntityNotFoundException;
import com.amazonaws.services.glue.model.GetDatabaseRequest;
import com.amazonaws.services.glue.model.GetPartitionsRequest;
import com.amazonaws.services.glue.model.GetPartitionsResult;
import com.amazonaws.services.glue.model.GetTableRequest;
import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.SerDeInfo;
import com.amazonaws.services.glue.model.StorageDescriptor;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hudi.aws.sync.HoodieGlueSyncException;
import org.apache.hudi.aws.utils.S3Utils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.util.TableUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;

public class AWSGlueCatalogSyncClient
extends AbstractHiveSyncHoodieClient {
    private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
    private static final int MAX_PARTITIONS_PER_REQUEST = 100;
    private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
    private final AWSGlue awsGlue = (AWSGlue)AWSGlueClientBuilder.standard().build();
    private final String databaseName;

    public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
        super(syncConfig, hadoopConf, fs);
        this.databaseName = syncConfig.databaseName;
    }

    @Override
    public List<Partition> getAllPartitions(String tableName) {
        try {
            GetPartitionsResult result;
            ArrayList<Partition> partitions = new ArrayList<Partition>();
            String nextToken = null;
            do {
                result = this.awsGlue.getPartitions(new GetPartitionsRequest().withDatabaseName(this.databaseName).withTableName(tableName).withNextToken(nextToken));
                partitions.addAll(result.getPartitions().stream().map(p -> new Partition(p.getValues(), p.getStorageDescriptor().getLocation())).collect(Collectors.toList()));
            } while ((nextToken = result.getNextToken()) != null);
            return partitions;
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Failed to get all partitions for table " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
        if (partitionsToAdd.isEmpty()) {
            LOG.info((Object)("No partitions to add for " + TableUtils.tableId(this.databaseName, tableName)));
            return;
        }
        LOG.info((Object)("Adding " + partitionsToAdd.size() + " partition(s) in table " + TableUtils.tableId(this.databaseName, tableName)));
        try {
            Table table = AWSGlueCatalogSyncClient.getTable(this.awsGlue, this.databaseName, tableName);
            StorageDescriptor sd = table.getStorageDescriptor();
            List partitionInputs = partitionsToAdd.stream().map(partition -> {
                StorageDescriptor partitionSd = sd.clone();
                String fullPartitionPath = FSUtils.getPartitionPath(this.syncConfig.basePath, partition).toString();
                List<String> partitionValues = this.partitionValueExtractor.extractPartitionValuesInPath((String)partition);
                partitionSd.setLocation(fullPartitionPath);
                return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
            }).collect(Collectors.toList());
            for (List batch : CollectionUtils.batches(partitionInputs, 100)) {
                BatchCreatePartitionRequest request = new BatchCreatePartitionRequest();
                request.withDatabaseName(this.databaseName).withTableName(tableName).withPartitionInputList(batch);
                BatchCreatePartitionResult result = this.awsGlue.batchCreatePartition(request);
                if (CollectionUtils.nonEmpty(result.getErrors())) {
                    throw new HoodieGlueSyncException("Fail to add partitions to " + TableUtils.tableId(this.databaseName, tableName) + " with error(s): " + result.getErrors());
                }
                Thread.sleep(1000L);
            }
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to add partitions to " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
        if (changedPartitions.isEmpty()) {
            LOG.info((Object)("No partitions to change for " + tableName));
            return;
        }
        LOG.info((Object)("Updating " + changedPartitions.size() + "partition(s) in table " + TableUtils.tableId(this.databaseName, tableName)));
        try {
            Table table = AWSGlueCatalogSyncClient.getTable(this.awsGlue, this.databaseName, tableName);
            StorageDescriptor sd = table.getStorageDescriptor();
            List updatePartitionEntries = changedPartitions.stream().map(partition -> {
                StorageDescriptor partitionSd = sd.clone();
                String fullPartitionPath = FSUtils.getPartitionPath(this.syncConfig.basePath, partition).toString();
                List<String> partitionValues = this.partitionValueExtractor.extractPartitionValuesInPath((String)partition);
                sd.setLocation(fullPartitionPath);
                PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
                return new BatchUpdatePartitionRequestEntry().withPartitionInput(partitionInput).withPartitionValueList(partitionValues);
            }).collect(Collectors.toList());
            for (List batch : CollectionUtils.batches(updatePartitionEntries, 100)) {
                BatchUpdatePartitionRequest request = new BatchUpdatePartitionRequest();
                request.withDatabaseName(this.databaseName).withTableName(tableName).withEntries(batch);
                BatchUpdatePartitionResult result = this.awsGlue.batchUpdatePartition(request);
                if (CollectionUtils.nonEmpty(result.getErrors())) {
                    throw new HoodieGlueSyncException("Fail to update partitions to " + TableUtils.tableId(this.databaseName, tableName) + " with error(s): " + result.getErrors());
                }
                Thread.sleep(1000L);
            }
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to update partitions to " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public void dropPartitions(String tableName, List<String> partitionsToDrop) {
        throw new UnsupportedOperationException("Not support dropPartitionsToTable yet.");
    }

    @Override
    public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
        if (MapUtils.nonEmpty(tableProperties)) {
            return;
        }
        try {
            AWSGlueCatalogSyncClient.updateTableParameters(this.awsGlue, this.databaseName, tableName, tableProperties, true);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to update properties for table " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public void updateTableDefinition(String tableName, MessageType newSchema) {
        boolean cascade = this.syncConfig.partitionFields.size() > 0;
        try {
            Table table = AWSGlueCatalogSyncClient.getTable(this.awsGlue, this.databaseName, tableName);
            LinkedHashMap<String, String> newSchemaMap = HiveSchemaUtil.parquetSchemaToMapSchema(newSchema, this.syncConfig.supportTimestamp, false);
            List newColumns = newSchemaMap.keySet().stream().map(key -> {
                String keyType = HiveSchemaUtil.getPartitionKeyType(newSchemaMap, key);
                return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
            }).collect(Collectors.toList());
            StorageDescriptor sd = table.getStorageDescriptor();
            sd.setColumns(newColumns);
            Date now = new Date();
            TableInput updatedTableInput = new TableInput().withName(tableName).withTableType(table.getTableType()).withParameters(table.getParameters()).withPartitionKeys((Collection)table.getPartitionKeys()).withStorageDescriptor(sd).withLastAccessTime(now).withLastAnalyzedTime(now);
            UpdateTableRequest request = new UpdateTableRequest().withDatabaseName(this.databaseName).withTableInput(updatedTableInput);
            this.awsGlue.updateTable(request);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to update definition for table " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
        throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
    }

    @Override
    public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
        throw new UnsupportedOperationException("Not supported: `updateTableComments`");
    }

    @Override
    public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
        throw new UnsupportedOperationException("Not supported: `updateTableComments`");
    }

    @Override
    public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map<String, String> serdeProperties, Map<String, String> tableProperties) {
        if (this.tableExists(tableName)) {
            return;
        }
        CreateTableRequest request = new CreateTableRequest();
        HashMap<String, String> params = new HashMap<String, String>();
        if (!this.syncConfig.createManagedTable.booleanValue()) {
            params.put("EXTERNAL", "TRUE");
        }
        params.putAll(tableProperties);
        try {
            LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, this.syncConfig.supportTimestamp, false);
            ArrayList<Column> schemaWithoutPartitionKeys = new ArrayList<Column>();
            for (String key : mapSchema.keySet()) {
                String keyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, key);
                Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
                if (this.syncConfig.partitionFields.contains(key)) continue;
                schemaWithoutPartitionKeys.add(column);
            }
            List schemaPartitionKeys = this.syncConfig.partitionFields.stream().map(partitionKey -> {
                String keyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey);
                return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
            }).collect(Collectors.toList());
            StorageDescriptor storageDescriptor = new StorageDescriptor();
            serdeProperties.put("serialization.format", "1");
            storageDescriptor.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties)).withLocation(S3Utils.s3aToS3(this.syncConfig.basePath)).withInputFormat(inputFormatClass).withOutputFormat(outputFormatClass).withColumns(schemaWithoutPartitionKeys);
            Date now = new Date();
            TableInput tableInput = new TableInput().withName(tableName).withTableType(TableType.EXTERNAL_TABLE.toString()).withParameters(params).withPartitionKeys(schemaPartitionKeys).withStorageDescriptor(storageDescriptor).withLastAccessTime(now).withLastAnalyzedTime(now);
            request.withDatabaseName(this.databaseName).withTableInput(tableInput);
            CreateTableResult result = this.awsGlue.createTable(request);
            LOG.info((Object)("Created table " + TableUtils.tableId(this.databaseName, tableName) + " : " + result));
        }
        catch (AlreadyExistsException e) {
            LOG.warn((Object)("Table " + TableUtils.tableId(this.databaseName, tableName) + " already exists."), (Throwable)e);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to create " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public Map<String, String> getTableSchema(String tableName) {
        try {
            Table table = AWSGlueCatalogSyncClient.getTable(this.awsGlue, this.databaseName, tableName);
            Map<String, String> partitionKeysMap = table.getPartitionKeys().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase()));
            Map<String, String> columnsMap = table.getStorageDescriptor().getColumns().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase()));
            HashMap<String, String> schema = new HashMap<String, String>();
            schema.putAll(columnsMap);
            schema.putAll(partitionKeysMap);
            return schema;
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to get schema for table " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public boolean doesTableExist(String tableName) {
        return this.tableExists(tableName);
    }

    @Override
    public boolean tableExists(String tableName) {
        GetTableRequest request = new GetTableRequest().withDatabaseName(this.databaseName).withName(tableName);
        try {
            return Objects.nonNull(this.awsGlue.getTable(request).getTable());
        }
        catch (EntityNotFoundException e) {
            LOG.info((Object)("Table not found: " + TableUtils.tableId(this.databaseName, tableName)), (Throwable)e);
            return false;
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to get table: " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public boolean databaseExists(String databaseName) {
        GetDatabaseRequest request = new GetDatabaseRequest();
        request.setName(databaseName);
        try {
            return Objects.nonNull(this.awsGlue.getDatabase(request).getDatabase());
        }
        catch (EntityNotFoundException e) {
            LOG.info((Object)("Database not found: " + databaseName), (Throwable)e);
            return false;
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to check if database exists " + databaseName, e);
        }
    }

    @Override
    public void createDatabase(String databaseName) {
        if (this.databaseExists(databaseName)) {
            return;
        }
        CreateDatabaseRequest request = new CreateDatabaseRequest();
        request.setDatabaseInput(new DatabaseInput().withName(databaseName).withDescription("Automatically created by " + this.getClass().getName()).withParameters(null).withLocationUri(null));
        try {
            CreateDatabaseResult result = this.awsGlue.createDatabase(request);
            LOG.info((Object)("Successfully created database in AWS Glue: " + result.toString()));
        }
        catch (AlreadyExistsException e) {
            LOG.warn((Object)("AWS Glue Database " + databaseName + " already exists"), (Throwable)e);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to create database " + databaseName, e);
        }
    }

    @Override
    public Option<String> getLastCommitTimeSynced(String tableName) {
        try {
            Table table = AWSGlueCatalogSyncClient.getTable(this.awsGlue, this.databaseName, tableName);
            return Option.ofNullable(table.getParameters().get("last_commit_time_sync"));
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to get last sync commit time for " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public void close() {
        this.awsGlue.shutdown();
    }

    @Override
    public void updateLastCommitTimeSynced(String tableName) {
        if (!this.activeTimeline.lastInstant().isPresent()) {
            LOG.warn((Object)"No commit in active timeline.");
            return;
        }
        String lastCommitTimestamp = this.activeTimeline.lastInstant().get().getTimestamp();
        try {
            AWSGlueCatalogSyncClient.updateTableParameters(this.awsGlue, this.databaseName, tableName, Collections.singletonMap("last_commit_time_sync", lastCommitTimestamp), false);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to update last sync commit time for " + TableUtils.tableId(this.databaseName, tableName), e);
        }
    }

    @Override
    public Option<String> getLastReplicatedTime(String tableName) {
        throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`");
    }

    @Override
    public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
        throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`");
    }

    @Override
    public void deleteLastReplicatedTimeStamp(String tableName) {
        throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
    }

    private static Table getTable(AWSGlue awsGlue, String databaseName, String tableName) throws HoodieGlueSyncException {
        GetTableRequest request = new GetTableRequest().withDatabaseName(databaseName).withName(tableName);
        try {
            return awsGlue.getTable(request).getTable();
        }
        catch (EntityNotFoundException e) {
            throw new HoodieGlueSyncException("Table not found: " + TableUtils.tableId(databaseName, tableName), e);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to get table " + TableUtils.tableId(databaseName, tableName), e);
        }
    }

    private static void updateTableParameters(AWSGlue awsGlue, String databaseName, String tableName, Map<String, String> updatingParams, boolean shouldReplace) {
        HashMap<String, String> newParams = new HashMap<String, String>();
        try {
            Table table = AWSGlueCatalogSyncClient.getTable(awsGlue, databaseName, tableName);
            if (!shouldReplace) {
                newParams.putAll(table.getParameters());
            }
            newParams.putAll(updatingParams);
            Date now = new Date();
            TableInput updatedTableInput = new TableInput().withName(tableName).withTableType(table.getTableType()).withParameters(newParams).withPartitionKeys((Collection)table.getPartitionKeys()).withStorageDescriptor(table.getStorageDescriptor()).withLastAccessTime(now).withLastAnalyzedTime(now);
            UpdateTableRequest request = new UpdateTableRequest();
            request.withDatabaseName(databaseName).withTableInput(updatedTableInput);
            awsGlue.updateTable(request);
        }
        catch (Exception e) {
            throw new HoodieGlueSyncException("Fail to update params for table " + TableUtils.tableId(databaseName, tableName) + ": " + newParams, e);
        }
    }

    private static enum TableType {
        MANAGED_TABLE,
        EXTERNAL_TABLE,
        VIRTUAL_VIEW,
        INDEX_TABLE,
        MATERIALIZED_VIEW;

    }
}

