package com.azure.cosmos.implementation.throughputControl.controller.container;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.Offer;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.QueryFeedOperationState;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationToken;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationTokenSource;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal;
import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase;
import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerFactory;
import com.azure.cosmos.implementation.throughputControl.exceptions.ThroughputControlInitializationException;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.RetrySpec;

/* loaded from: input_file:com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.class */
public class ThroughputContainerController implements IThroughputContainerController {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputContainerController.class);
    private static final Duration DEFAULT_THROUGHPUT_REFRESH_INTERVAL = Duration.ofMinutes(15);
    private static final int NO_OFFER_EXCEPTION_STATUS_CODE = 400;
    private static final int NO_OFFER_EXCEPTION_SUB_STATUS_CODE = 10004;
    private final AsyncDocumentClient client;
    private final RxCollectionCache collectionCache;
    private final ConnectionMode connectionMode;
    private final AsyncCache<String, ThroughputGroupControllerBase> groupControllerCache;
    private final Set<ThroughputControlGroupInternal> groups;
    private final AtomicReference<Integer> maxContainerThroughput;
    private final RxPartitionKeyRangeCache partitionKeyRangeCache;
    private final CosmosAsyncContainer targetContainer;
    private final LinkedCancellationTokenSource cancellationTokenSource;
    private final ConcurrentHashMap<String, LinkedCancellationToken> cancellationTokenMap;
    private final Mono<Integer> throughputQueryMono;
    private ThroughputGroupControllerBase defaultGroupController;
    private String targetContainerRid;
    private String targetDatabaseRid;
    private ThroughputProvisioningScope throughputProvisioningScope;

    public ThroughputContainerController(RxCollectionCache rxCollectionCache, ConnectionMode connectionMode, Set<ThroughputControlGroupInternal> set, RxPartitionKeyRangeCache rxPartitionKeyRangeCache, LinkedCancellationToken linkedCancellationToken, Mono<Integer> mono) {
        Preconditions.checkNotNull(rxCollectionCache, "Collection cache can not be null");
        Preconditions.checkArgument(set != null && set.size() > 0, "Throughput budget groups can not be null or empty");
        Preconditions.checkNotNull(rxPartitionKeyRangeCache, "RxPartitionKeyRangeCache can not be null");
        this.collectionCache = rxCollectionCache;
        this.connectionMode = connectionMode;
        this.groupControllerCache = new AsyncCache<>();
        this.groups = set;
        this.maxContainerThroughput = new AtomicReference<>();
        this.partitionKeyRangeCache = rxPartitionKeyRangeCache;
        this.targetContainer = set.iterator().next().getTargetContainer();
        this.client = CosmosBridgeInternal.getContextClient(this.targetContainer);
        this.throughputProvisioningScope = getThroughputResolveLevel(set);
        this.cancellationTokenSource = new LinkedCancellationTokenSource(linkedCancellationToken);
        this.cancellationTokenMap = new ConcurrentHashMap<>();
        this.throughputQueryMono = mono == null ? resolveContainerMaxThroughputCore() : mono;
    }

    private ThroughputProvisioningScope getThroughputResolveLevel(Set<ThroughputControlGroupInternal> set) {
        return set.stream().anyMatch(throughputControlGroupInternal -> {
            return throughputControlGroupInternal.getTargetThroughputThreshold() != null;
        }) ? ThroughputProvisioningScope.CONTAINER : ThroughputProvisioningScope.NONE;
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.IThroughputController
    public <T> Mono<T> init() {
        return resolveContainerResourceId().flatMap(str -> {
            return resolveContainerMaxThroughput();
        }).flatMap(throughputContainerController -> {
            return createAndInitializeGroupControllers();
        }).doOnSuccess(throughputContainerController2 -> {
            CosmosSchedulers.COSMOS_PARALLEL.schedule(() -> {
                refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).subscribe();
            });
        }).thenReturn(this);
    }

    private Mono<String> resolveDatabaseResourceId() {
        return this.targetContainer.getDatabase().read().flatMap(cosmosDatabaseResponse -> {
            this.targetDatabaseRid = cosmosDatabaseResponse.getProperties().getResourceId();
            return Mono.just(this.targetDatabaseRid);
        });
    }

    private Mono<String> resolveContainerResourceId() {
        return this.targetContainer.read().flatMap(cosmosContainerResponse -> {
            this.targetContainerRid = cosmosContainerResponse.getProperties().getResourceId();
            return Mono.just(this.targetContainerRid);
        });
    }

    private Mono<ThroughputResponse> resolveDatabaseThroughput() {
        return Mono.justOrEmpty(this.targetDatabaseRid).switchIfEmpty(resolveDatabaseResourceId()).flatMap(this::resolveThroughputByResourceId);
    }

    private Mono<ThroughputResponse> resolveContainerThroughput() {
        return StringUtils.isEmpty(this.targetContainerRid) ? resolveContainerResourceId().flatMap(this::resolveThroughputByResourceId).onErrorResume(th -> {
            if (isOwnerResourceNotExistsException(th)) {
                this.collectionCache.refresh(null, BridgeInternal.getLink(this.targetContainer), null);
            }
            return Mono.error(th);
        }).retryWhen(RetrySpec.max(1L).filter(this::isOwnerResourceNotExistsException)) : Mono.just(this.targetContainerRid).flatMap(this::resolveThroughputByResourceId);
    }

    private Mono<ThroughputContainerController> resolveContainerMaxThroughput() {
        return this.throughputQueryMono.flatMap(num -> {
            this.maxContainerThroughput.set(num);
            return Mono.just(this);
        }).switchIfEmpty(Mono.just(this));
    }

    private Mono<Integer> resolveContainerMaxThroughputCore() {
        return Mono.defer(() -> {
            return Mono.just(this.throughputProvisioningScope);
        }).flatMap(throughputProvisioningScope -> {
            return throughputProvisioningScope == ThroughputProvisioningScope.CONTAINER ? resolveContainerThroughput().onErrorResume(th -> {
                if (isOfferNotConfiguredException(th)) {
                    this.throughputProvisioningScope = ThroughputProvisioningScope.DATABASE;
                }
                return Mono.error(th);
            }) : throughputProvisioningScope == ThroughputProvisioningScope.DATABASE ? resolveDatabaseThroughput().onErrorResume(th2 -> {
                if (isOfferNotConfiguredException(th2)) {
                    this.throughputProvisioningScope = ThroughputProvisioningScope.CONTAINER;
                }
                return Mono.error(th2);
            }) : Mono.empty();
        }).map(this::getMaxContainerThroughput).onErrorResume(th -> {
            if (isOwnerResourceNotExistsException(th)) {
                this.cancellationTokenSource.close();
            }
            return Mono.error(th);
        }).retryWhen(RetrySpec.max(1L).filter(this::isOfferNotConfiguredException));
    }

    private Mono<ThroughputResponse> resolveThroughputByResourceId(String str) {
        Preconditions.checkArgument(StringUtils.isNotEmpty(str), "ResourceId can not be null or empty");
        return this.client.queryOffers(BridgeInternal.getOfferQuerySpecFromResourceId(this.targetContainer, str), new QueryFeedOperationState(ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor().getCosmosAsyncClient(this.targetContainer.getDatabase()), "resolveThroughputByResourceId", this.targetContainer.getDatabase().getId(), this.targetContainer.getId(), ResourceType.Offer, OperationType.Query, null, new CosmosQueryRequestOptions(), new CosmosPagedFluxOptions())).single().flatMap(feedResponse -> {
            if (!feedResponse.getResults().isEmpty()) {
                return this.client.readOffer(((Offer) feedResponse.getResults().get(0)).getSelfLink()).single();
            }
            ?? createCosmosException = BridgeInternal.createCosmosException(400, "No offers found for the resource " + str);
            BridgeInternal.setSubStatusCode(createCosmosException, 10004);
            return Mono.error((Throwable) createCosmosException);
        }).map(ModelBridgeInternal::createThroughputRespose);
    }

    private Integer getMaxContainerThroughput(ThroughputResponse throughputResponse) {
        Preconditions.checkNotNull(throughputResponse, "Throughput response can not be null");
        ThroughputProperties properties = throughputResponse.getProperties();
        return Integer.valueOf(Math.max(properties.getAutoscaleMaxThroughput(), properties.getManualThroughput().intValue()));
    }

    private boolean isOfferNotConfiguredException(Throwable th) {
        Preconditions.checkNotNull(th, "Throwable should not be null");
        CosmosException cosmosException = (CosmosException) Utils.as(Exceptions.unwrap(th), CosmosException.class);
        return cosmosException != null && cosmosException.getStatusCode() == 400 && cosmosException.getSubStatusCode() == 10004;
    }

    private boolean isOwnerResourceNotExistsException(Throwable th) {
        Preconditions.checkNotNull(th, "Throwable should not be null");
        CosmosException cosmosException = (CosmosException) Utils.as(Exceptions.unwrap(th), CosmosException.class);
        return cosmosException != null && cosmosException.getStatusCode() == 404 && cosmosException.getSubStatusCode() == 1003;
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.IThroughputController
    public <T> Mono<T> processRequest(RxDocumentServiceRequest rxDocumentServiceRequest, Mono<T> mono) {
        Preconditions.checkNotNull(rxDocumentServiceRequest, "Request can not be null");
        Preconditions.checkNotNull(mono, "Original request mono can not be null");
        return getOrCreateThroughputGroupController(rxDocumentServiceRequest.getThroughputControlGroupName()).flatMap(valueHolder -> {
            return valueHolder.v != 0 ? ((ThroughputGroupControllerBase) valueHolder.v).processRequest(rxDocumentServiceRequest, mono) : mono;
        });
    }

    private Mono<Utils.ValueHolder<ThroughputGroupControllerBase>> getOrCreateThroughputGroupController(String str) {
        if (StringUtils.isEmpty(str)) {
            return Mono.just(new Utils.ValueHolder(this.defaultGroupController));
        }
        for (ThroughputControlGroupInternal throughputControlGroupInternal : this.groups) {
            if (StringUtils.equals(str, throughputControlGroupInternal.getGroupName())) {
                return resolveThroughputGroupController(throughputControlGroupInternal).map((v1) -> {
                    return new Utils.ValueHolder(v1);
                });
            }
        }
        return Mono.just(new Utils.ValueHolder(this.defaultGroupController));
    }

    public String getTargetContainerRid() {
        return this.targetContainerRid;
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.IThroughputController
    public boolean canHandleRequest(RxDocumentServiceRequest rxDocumentServiceRequest) {
        Preconditions.checkNotNull(rxDocumentServiceRequest, "Request can not be null");
        return StringUtils.equals(this.targetContainerRid, rxDocumentServiceRequest.requestContext.resolvedCollectionRid);
    }

    private Mono<ThroughputContainerController> createAndInitializeGroupControllers() {
        return Flux.fromIterable(this.groups).flatMap(this::resolveThroughputGroupController).then(Mono.just(this));
    }

    private Mono<ThroughputGroupControllerBase> resolveThroughputGroupController(ThroughputControlGroupInternal throughputControlGroupInternal) {
        return this.groupControllerCache.getAsync(throughputControlGroupInternal.getGroupName(), null, () -> {
            return createAndInitializeGroupController(throughputControlGroupInternal);
        }).onErrorResume(th -> {
            return Mono.error(new ThroughputControlInitializationException(th));
        });
    }

    private Mono<ThroughputGroupControllerBase> createAndInitializeGroupController(ThroughputControlGroupInternal throughputControlGroupInternal) {
        return ThroughputGroupControllerFactory.createController(this.connectionMode, throughputControlGroupInternal, this.maxContainerThroughput.get(), this.partitionKeyRangeCache, this.targetContainerRid, this.cancellationTokenMap.compute(throughputControlGroupInternal.getGroupName(), (str, linkedCancellationToken) -> {
            return this.cancellationTokenSource.getToken();
        })).init().cast(ThroughputGroupControllerBase.class).doOnSuccess(throughputGroupControllerBase -> {
            if (throughputGroupControllerBase.isDefault()) {
                this.defaultGroupController = throughputGroupControllerBase;
            }
        });
    }

    private Flux<Void> refreshContainerMaxThroughputTask(LinkedCancellationToken linkedCancellationToken) {
        Preconditions.checkNotNull(linkedCancellationToken, "Cancellation token can not be null");
        return this.throughputProvisioningScope == ThroughputProvisioningScope.NONE ? Flux.empty() : Mono.delay(DEFAULT_THROUGHPUT_REFRESH_INTERVAL, CosmosSchedulers.COSMOS_PARALLEL).flatMap(l -> {
            return linkedCancellationToken.isCancellationRequested() ? Mono.empty() : resolveContainerMaxThroughput();
        }).flatMapIterable(throughputContainerController -> {
            return this.groups;
        }).flatMap(this::resolveThroughputGroupController).doOnNext(throughputGroupControllerBase -> {
            throughputGroupControllerBase.onContainerMaxThroughputRefresh(this.maxContainerThroughput.get().intValue());
        }).onErrorResume(th -> {
            logger.warn("Refresh throughput failed with reason {}", th.getMessage());
            return Mono.empty();
        }).then().repeat(() -> {
            return !linkedCancellationToken.isCancellationRequested();
        });
    }
}
