package com.bxm.openlog.sdk.scheduler;

import com.bxm.openlog.sdk.listener.eventbus.EventBusSubscriberMessageListener;
import com.bxm.openlog.sdk.listener.eventbus.OpenLogEventBusFactory;
import com.bxm.warcar.integration.eventbus.AsyncEventPark;
import com.bxm.warcar.mq.SingleMessageListener;
import com.bxm.warcar.mq.rocketmq.RocketmqConsumer;
import com.bxm.warcar.utils.NamedThreadFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@EnableConfigurationProperties({MqFlowControlProperties.class})
@ConditionalOnClass({RocketmqConsumer.class, OpenLogEventBusFactory.class})
@ConditionalOnProperty(value = {MqFlowControlProperties.PROPERTY_ENABLE}, havingValue = "true")
@Component
/* loaded from: input_file:com/bxm/openlog/sdk/scheduler/MqFlowControlScheduler.class */
public class MqFlowControlScheduler implements ApplicationListener<ApplicationReadyEvent> {
    private static final Logger log = LoggerFactory.getLogger(MqFlowControlScheduler.class);
    private final Map<DefaultMQPushConsumer, AsyncEventPark> map = new HashMap();
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new NamedThreadFactory("MqFlowControlScheduler"));
    private final MqFlowControlProperties properties;

    public MqFlowControlScheduler(MqFlowControlProperties mqFlowControlProperties) {
        this.properties = mqFlowControlProperties;
    }

    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        for (RocketmqConsumer rocketmqConsumer : (List) ((List) applicationReadyEvent.getApplicationContext().getBeanFactory().getBeansOfType(OpenLogEventBusFactory.class).values().stream().filter(openLogEventBusFactory -> {
            return openLogEventBusFactory.getConsumerMap() != null;
        }).flatMap(openLogEventBusFactory2 -> {
            return openLogEventBusFactory2.getConsumerMap().values().stream();
        }).collect(Collectors.toList())).stream().filter(consumer -> {
            return consumer instanceof RocketmqConsumer;
        }).map(consumer2 -> {
            return (RocketmqConsumer) consumer2;
        }).collect(Collectors.toList())) {
            SingleMessageListener messageListener = rocketmqConsumer.getMessageListener();
            if (messageListener instanceof EventBusSubscriberMessageListener) {
                AsyncEventPark eventPark = ((EventBusSubscriberMessageListener) messageListener).getEventPark();
                if (eventPark instanceof AsyncEventPark) {
                    this.map.put(rocketmqConsumer.getConsumer(), eventPark);
                }
            }
        }
        if (this.map.size() > 0) {
            startScheduler();
        }
    }

    private void startScheduler() {
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            for (Map.Entry<DefaultMQPushConsumer, AsyncEventPark> entry : this.map.entrySet()) {
                DefaultMQPushConsumer key = entry.getKey();
                boolean isPause = key.getDefaultMQPushConsumerImpl().isPause();
                int queueSize = entry.getValue().getQueueSize();
                int threshold = this.properties.getThreshold();
                if (queueSize >= threshold && !isPause) {
                    key.suspend();
                    log.info("Consumer suspend, consumerGroup: {}, queueSize: {}", key.getConsumerGroup(), Integer.valueOf(queueSize));
                }
                if (queueSize < threshold && isPause) {
                    key.resume();
                    log.info("Consumer resume, consumerGroup: {}, queueSize: {}", key.getConsumerGroup(), Integer.valueOf(queueSize));
                }
            }
        }, 0L, this.properties.getIntervalTime(), TimeUnit.SECONDS);
    }
}
