/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.redisson.RedissonKeys;
import org.redisson.client.RedisClient;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.CommandReactiveExecutor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class RedissonKeysReactive {
    private final CommandReactiveExecutor commandExecutor;
    private final RedissonKeys instance;

    public RedissonKeysReactive(CommandReactiveExecutor commandExecutor) {
        this.instance = new RedissonKeys(commandExecutor);
        this.commandExecutor = commandExecutor;
    }

    public Flux<String> getKeysByPattern(String pattern) {
        return this.getKeysByPattern(pattern, 10);
    }

    public Flux<String> getKeysByPattern(String pattern, int count) {
        ArrayList<Flux<String>> publishers = new ArrayList<Flux<String>>();
        for (MasterSlaveEntry entry : this.commandExecutor.getConnectionManager().getEntrySet()) {
            publishers.add(this.createKeysIterator(entry, pattern, count));
        }
        return Flux.merge(publishers);
    }

    private Flux<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
        return Flux.create((Consumer)new Consumer<FluxSink<String>>(){

            @Override
            public void accept(final FluxSink<String> emitter) {
                emitter.onRequest(new LongConsumer(){
                    private RedisClient client;
                    private List<String> firstValues;
                    private long nextIterPos;
                    private long currentIndex;

                    @Override
                    public void accept(long value) {
                        this.currentIndex = value;
                        this.nextValues((FluxSink<String>)emitter);
                    }

                    protected void nextValues(FluxSink<String> emitter2) {
                        RedissonKeysReactive.this.instance.scanIteratorAsync(this.client, entry, this.nextIterPos, pattern, count).onComplete((res, e) -> {
                            if (e != null) {
                                emitter2.error(e);
                                return;
                            }
                            this.client = res.getRedisClient();
                            long prevIterPos = this.nextIterPos;
                            if (this.nextIterPos == 0L && this.firstValues == null) {
                                this.firstValues = (List)res.getValues();
                            } else if (res.getValues().equals(this.firstValues)) {
                                emitter2.complete();
                                this.currentIndex = 0L;
                                return;
                            }
                            this.nextIterPos = res.getPos();
                            if (prevIterPos == this.nextIterPos) {
                                this.nextIterPos = -1L;
                            }
                            for (Object val : res.getValues()) {
                                emitter2.next((Object)((String)val));
                                --this.currentIndex;
                                if (this.currentIndex != 0L) continue;
                                emitter2.complete();
                                return;
                            }
                            if (this.nextIterPos == -1L) {
                                emitter2.complete();
                                this.currentIndex = 0L;
                            }
                            if (this.currentIndex == 0L) {
                                return;
                            }
                            this.nextValues(emitter2);
                        });
                    }
                });
            }
        });
    }
}

