/*
 * Decompiled with CFR 0.152.
 */
package de.codecentric.boot.admin.server.domain.entities;

import de.codecentric.boot.admin.server.domain.entities.EventsourcingInstanceRepository;
import de.codecentric.boot.admin.server.domain.entities.Instance;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.eventstore.InstanceEventStore;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SnapshottingInstanceRepository
extends EventsourcingInstanceRepository {
    private static final Logger log = LoggerFactory.getLogger(SnapshottingInstanceRepository.class);
    private final ConcurrentMap<InstanceId, Instance> snapshots = new ConcurrentHashMap<InstanceId, Instance>();
    private Disposable subscription;

    public SnapshottingInstanceRepository(InstanceEventStore eventStore) {
        super(eventStore);
    }

    @Override
    public Flux<Instance> findAll() {
        return Mono.fromSupplier(this.snapshots::values).flatMapIterable(Function.identity());
    }

    @Override
    public Mono<Instance> find(InstanceId id) {
        return Mono.defer(() -> Mono.justOrEmpty(this.snapshots.get(id)));
    }

    @Override
    public Flux<Instance> findByName(String name) {
        return this.findAll().filter(a -> a.isRegistered() && name.equals(a.getRegistration().getName()));
    }

    public void start() {
        this.subscription = this.getEventStore().findAll().concatWith((Publisher)this.getEventStore()).concatMap(this::updateSnapshot).subscribe();
    }

    public void stop() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    protected Mono<Void> updateSnapshot(InstanceEvent event) {
        return Mono.fromRunnable(() -> this.snapshots.compute(event.getInstance(), (? super K key, ? super V old) -> {
            Instance instance = old != null ? old : Instance.create(key);
            return instance.apply(event);
        })).onErrorResume(ex -> {
            log.warn("Error while updating the snapshot with event {}. Recomputing instance snapshot from event history.", (Object)event, ex);
            return this.recomputeSnapshot(event.getInstance());
        });
    }

    protected Mono<Void> recomputeSnapshot(InstanceId instanceId) {
        return this.getEventStore().find(instanceId).collectList().map(events -> Instance.create(instanceId).apply((Collection<InstanceEvent>)events)).doOnNext(instance -> this.snapshots.put(instance.getId(), (Instance)instance)).then().onErrorResume(ex2 -> {
            log.error("Error while recomputing snapshot. Event history for instance {} may be wrong,", (Object)instanceId, ex2);
            return Mono.empty();
        });
    }
}

