package com.bxm.localnews.msg.listener;

import com.bxm.localnews.mq.common.model.dto.PushMessage;
import com.bxm.localnews.msg.config.PushMessageStatusEnum;
import com.bxm.localnews.msg.param.PushMessageBucket;
import com.bxm.localnews.msg.push.PushExecutor;
import com.bxm.localnews.msg.service.MessageGroupService;
import com.bxm.newidea.component.thread.NamedThreadFactory;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/bxm/localnews/msg/listener/PushMsgListener.class */
public class PushMsgListener implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(PushMsgListener.class);

    @Resource
    private PushExecutor pushExecutor;

    @Resource
    private MessageGroupService messageGroupService;
    private ThreadPoolTaskExecutor executor;
    private AtomicLong totalDelayMills = new AtomicLong(0);
    private static final long DELAY_MILLS = 100;

    @StreamListener("singlePushInput")
    public void handlerSingle(PushMessage pushMessage) {
        if (log.isDebugEnabled()) {
            log.info("接收到推送消息：{}", pushMessage);
        }
        delay();
        this.executor.execute(() -> {
            this.pushExecutor.push(pushMessage);
        });
    }

    private void delay() {
        log.debug("excutor thread num:" + this.executor.getActiveCount());
        if (hasIdleTime()) {
            this.totalDelayMills.set(0L);
            return;
        }
        try {
            TimeUnit.MILLISECONDS.sleep(DELAY_MILLS);
            log.info("触发延缓消息推送速度,当前已延缓总时间：{}", Long.valueOf(this.totalDelayMills.addAndGet(DELAY_MILLS)));
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
            Thread.currentThread().interrupt();
        }
    }

    public boolean hasIdleTime() {
        return this.executor.getActiveCount() < this.executor.getMaxPoolSize();
    }

    @StreamListener("batchPushInput")
    public void handlerBatch(PushMessageBucket pushMessageBucket) {
        if (log.isDebugEnabled()) {
            log.debug("接收到批量推送：{}", pushMessageBucket);
        }
        if (null == pushMessageBucket.getPushMsgId() || CollectionUtils.isEmpty(pushMessageBucket.getTargetUserIds())) {
            log.error("接受数据不完整，参数为：{}", pushMessageBucket);
            return;
        }
        PushMessage loadCache = this.messageGroupService.loadCache(pushMessageBucket.getPushMsgId());
        if (null == loadCache) {
            log.error("接受到的推送消息不存在，接受信息为：{}", pushMessageBucket);
            return;
        }
        if (Objects.equals(pushMessageBucket.getIndex(), pushMessageBucket.getTotal())) {
            if (log.isDebugEnabled()) {
                log.debug("消息[{}]已经处理所有分片，变更为更新完成状态", pushMessageBucket.getPushMsgId());
            }
            this.messageGroupService.changeStatus(loadCache.getPayloadInfo().getMsgId(), PushMessageStatusEnum.HAS_BEEN_SEND);
        }
        pushMessageBucket.getTargetUserIds().forEach(l -> {
            delay();
            this.executor.execute(() -> {
                PushMessage build = PushMessage.build();
                BeanUtils.copyProperties(loadCache, build);
                this.pushExecutor.push(build.assign(l));
            });
        });
    }

    private void initThreadPool() {
        this.executor = new ThreadPoolTaskExecutor();
        this.executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 2);
        this.executor.setQueueCapacity(10000);
        this.executor.setMaxPoolSize(30);
        this.executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        this.executor.setThreadFactory(new NamedThreadFactory("batch-push"));
        this.executor.initialize();
    }

    public void afterPropertiesSet() {
        initThreadPool();
    }
}
