package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/epkversion/BootstrapperImpl.class */
class BootstrapperImpl implements Bootstrapper {
    private final Logger logger = LoggerFactory.getLogger(BootstrapperImpl.class);
    private final PartitionSynchronizer synchronizer;
    private final LeaseStore leaseStore;
    private final LeaseStoreManager epkRangeVersionLeaseStoreManager;
    private final LeaseStoreManager pkRangeVersionLeaseStoreManager;
    private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private final ChangeFeedMode changeFeedModeToStart;
    private final Duration lockTime;
    private final Duration sleepTime;
    private volatile boolean isInitialized;
    private volatile boolean isLockAcquired;

    public BootstrapperImpl(PartitionSynchronizer partitionSynchronizer, LeaseStore leaseStore, Duration duration, Duration duration2, LeaseStoreManager leaseStoreManager, LeaseStoreManager leaseStoreManager2, ChangeFeedProcessorOptions changeFeedProcessorOptions, ChangeFeedMode changeFeedMode) {
        Preconditions.checkNotNull(partitionSynchronizer, "Argument 'synchronizer' can not be null");
        Preconditions.checkNotNull(leaseStore, "Argument 'leaseStore' can not be null");
        Preconditions.checkNotNull(leaseStoreManager, "Argument 'epkRangeVersionLeaseStoreManager' can not be null");
        Preconditions.checkNotNull(leaseStoreManager2, "Argument 'pkRangeVersionLeaseStoreManager' can not be null");
        Preconditions.checkNotNull(changeFeedProcessorOptions, "Argument 'changeFeedProcessorOptions' can not be null");
        Preconditions.checkNotNull(changeFeedMode, "Argument 'changeFeedModeToStart' can not be null");
        Preconditions.checkArgument(duration != null && isPositive(duration), "lockTime should be non-null and positive");
        Preconditions.checkArgument(duration2 != null && isPositive(duration2), "sleepTime should be non-null and positive");
        this.synchronizer = partitionSynchronizer;
        this.leaseStore = leaseStore;
        this.epkRangeVersionLeaseStoreManager = leaseStoreManager;
        this.pkRangeVersionLeaseStoreManager = leaseStoreManager2;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.changeFeedModeToStart = changeFeedMode;
        this.lockTime = duration;
        this.sleepTime = duration2;
        this.isInitialized = false;
    }

    private boolean isPositive(Duration duration) {
        return (duration.isNegative() || duration.isZero()) ? false : true;
    }

    @Override // com.azure.cosmos.implementation.changefeed.Bootstrapper
    public Mono<Void> initialize() {
        this.isInitialized = false;
        return Mono.just(this).flatMap(bootstrapperImpl -> {
            return this.leaseStore.isInitialized();
        }).flatMap(bool -> {
            this.isInitialized = bool.booleanValue();
            Mono<Void> empty = Mono.empty();
            if (bool.booleanValue()) {
                if (!this.changeFeedProcessorOptions.isLeaseVerificationEnabledOnRestart()) {
                    return validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
                }
                empty = validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
            }
            this.logger.info("Acquire initialization lock");
            return empty.then(this.leaseStore.acquireInitializationLock(this.lockTime).flatMap(bool -> {
                this.isLockAcquired = bool.booleanValue();
                if (this.isLockAcquired) {
                    return this.synchronizer.createMissingLeases().then(!this.isInitialized ? this.leaseStore.markInitialized().flatMap(bool -> {
                        return Mono.just(bool);
                    }) : Mono.just(bool));
                }
                this.logger.info("Another instance is initializing the store");
                return Mono.just(Boolean.valueOf(this.isLockAcquired)).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
            }).onErrorResume(th -> {
                this.logger.warn("Unexpected exception caught while initializing the lock", th);
                return Mono.just(Boolean.valueOf(this.isLockAcquired));
            }).flatMap(bool2 -> {
                return this.isLockAcquired ? this.leaseStore.releaseInitializationLock() : Mono.just(bool2);
            }));
        }).repeat(() -> {
            return !this.isInitialized;
        }).then();
    }

    private Mono<Void> validateLeaseCFModeInteroperabilityForEpkRangeBasedLease() {
        return this.pkRangeVersionLeaseStoreManager.getTopLeases(1).next().flatMap(lease -> {
            return lease.getVersion() == LeaseVersion.PARTITION_KEY_BASED_LEASE ? Mono.error(new IllegalStateException(String.format("ChangeFeedProcessor#handleAllVersionsAndDeletes cannot be invoked whenChangeFeedProcessor#handleChanges was also started forlease prefix : %s", this.changeFeedProcessorOptions.getLeasePrefix()))) : Mono.empty();
        }).switchIfEmpty(this.epkRangeVersionLeaseStoreManager.getTopLeases(1).next()).flatMap(obj -> {
            return Mono.just((Lease) obj);
        }).flatMap(lease2 -> {
            return (lease2.getVersion() != LeaseVersion.EPK_RANGE_BASED_LEASE || Strings.isNullOrEmpty(lease2.getId()) || Strings.isNullOrEmpty(lease2.getContinuationToken()) || ChangeFeedState.fromString(lease2.getContinuationToken()).getMode() == this.changeFeedModeToStart) ? Mono.empty() : Mono.error(new IllegalStateException(String.format("ChangeFeedProcessor#handleAllVersionsAndDeletes cannot be invoked when ChangeFeedProcessor#handleLatestVersionChanges were also started for lease prefix : %s", this.changeFeedProcessorOptions.getLeasePrefix())));
        });
    }
}
