/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.openlog.sdk.scheduler;

import com.bxm.openlog.sdk.listener.eventbus.EventBusSubscriberMessageListener;
import com.bxm.openlog.sdk.listener.eventbus.OpenLogEventBusFactory;
import com.bxm.openlog.sdk.scheduler.MqFlowControlProperties;
import com.bxm.warcar.integration.eventbus.AsyncEventPark;
import com.bxm.warcar.integration.eventbus.EventPark;
import com.bxm.warcar.mq.SingleMessageListener;
import com.bxm.warcar.mq.rocketmq.RocketmqConsumer;
import com.bxm.warcar.utils.NamedThreadFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationListener;

@ConditionalOnClass(value={RocketmqConsumer.class, OpenLogEventBusFactory.class})
@ConditionalOnProperty(value={"mq.flow.control.enable"}, havingValue="true")
@EnableConfigurationProperties(value={MqFlowControlProperties.class})
public class MqFlowControlScheduler
implements ApplicationListener<ApplicationReadyEvent> {
    private static final Logger log = LoggerFactory.getLogger(MqFlowControlScheduler.class);
    private final Map<DefaultMQPushConsumer, AsyncEventPark> map = new HashMap<DefaultMQPushConsumer, AsyncEventPark>();
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("MqFlowControlScheduler"));
    private final MqFlowControlProperties properties;

    public MqFlowControlScheduler(MqFlowControlProperties properties) {
        this.properties = properties;
    }

    public void onApplicationEvent(ApplicationReadyEvent event) {
        ConfigurableListableBeanFactory beanFactory = event.getApplicationContext().getBeanFactory();
        List rocketmqConsumers = beanFactory.getBeansOfType(OpenLogEventBusFactory.class).values().stream().filter(x -> x.getConsumerMap() != null).flatMap(x -> x.getConsumerMap().values().stream()).collect(Collectors.toList()).stream().filter(x -> x instanceof RocketmqConsumer).map(x -> (RocketmqConsumer)x).collect(Collectors.toList());
        for (RocketmqConsumer rocketmqConsumer : rocketmqConsumers) {
            EventBusSubscriberMessageListener listener;
            EventPark eventPark;
            SingleMessageListener messageListener = rocketmqConsumer.getMessageListener();
            if (!(messageListener instanceof EventBusSubscriberMessageListener) || !((eventPark = (listener = (EventBusSubscriberMessageListener)messageListener).getEventPark()) instanceof AsyncEventPark)) continue;
            DefaultMQPushConsumer consumer = rocketmqConsumer.getConsumer();
            this.map.put(consumer, (AsyncEventPark)eventPark);
        }
        if (this.map.size() > 0) {
            this.startScheduler();
        }
    }

    private void startScheduler() {
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            for (Map.Entry<DefaultMQPushConsumer, AsyncEventPark> entry : this.map.entrySet()) {
                int threshold;
                DefaultMQPushConsumer consumer = entry.getKey();
                boolean pause = consumer.getDefaultMQPushConsumerImpl().isPause();
                int queueSize = entry.getValue().getQueueSize();
                if (queueSize >= (threshold = this.properties.getThreshold()) && !pause) {
                    consumer.suspend();
                    log.info("Consumer suspend, consumerGroup: {}, queueSize: {}", (Object)consumer.getConsumerGroup(), (Object)queueSize);
                }
                if (queueSize >= threshold || !pause) continue;
                consumer.resume();
                log.info("Consumer resume, consumerGroup: {}, queueSize: {}", (Object)consumer.getConsumerGroup(), (Object)queueSize);
            }
        }, 0L, this.properties.getIntervalTime(), TimeUnit.SECONDS);
    }
}

