/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.localnews.msg.listener;

import com.bxm.localnews.mq.common.model.dto.PushMessage;
import com.bxm.localnews.msg.config.PushMessageStatusEnum;
import com.bxm.localnews.msg.param.PushMessageBucket;
import com.bxm.localnews.msg.push.PushExecutor;
import com.bxm.localnews.msg.service.MessageGroupService;
import com.bxm.newidea.component.thread.NamedThreadFactory;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class PushMsgListener
implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(PushMsgListener.class);
    @Resource
    private PushExecutor pushExecutor;
    @Resource
    private MessageGroupService messageGroupService;
    private ThreadPoolTaskExecutor executor;
    private AtomicLong totalDelayMills = new AtomicLong(0L);
    private static final long DELAY_MILLS = 100L;

    @StreamListener(value="singlePushInput")
    public void handlerSingle(PushMessage message) {
        if (log.isDebugEnabled()) {
            log.info("\u63a5\u6536\u5230\u63a8\u9001\u6d88\u606f\uff1a{}", (Object)message);
        }
        this.delay();
        this.executor.execute(() -> this.pushExecutor.push(message));
    }

    private void delay() {
        log.debug("excutor thread num:" + this.executor.getActiveCount());
        if (!this.hasIdleTime()) {
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
                long currentDelayMills = this.totalDelayMills.addAndGet(100L);
                log.info("\u89e6\u53d1\u5ef6\u7f13\u6d88\u606f\u63a8\u9001\u901f\u5ea6,\u5f53\u524d\u5df2\u5ef6\u7f13\u603b\u65f6\u95f4\uff1a{}", (Object)currentDelayMills);
            }
            catch (InterruptedException e) {
                log.error(e.getMessage(), (Throwable)e);
                Thread.currentThread().interrupt();
            }
        } else {
            this.totalDelayMills.set(0L);
        }
    }

    public boolean hasIdleTime() {
        return this.executor.getActiveCount() < this.executor.getMaxPoolSize();
    }

    @StreamListener(value="batchPushInput")
    public void handlerBatch(PushMessageBucket bucket) {
        if (log.isDebugEnabled()) {
            log.debug("\u63a5\u6536\u5230\u6279\u91cf\u63a8\u9001\uff1a{}", (Object)bucket);
        }
        if (null == bucket.getPushMsgId() || CollectionUtils.isEmpty((Collection)bucket.getTargetUserIds())) {
            log.error("\u63a5\u53d7\u6570\u636e\u4e0d\u5b8c\u6574\uff0c\u53c2\u6570\u4e3a\uff1a{}", (Object)bucket);
            return;
        }
        PushMessage message = this.messageGroupService.loadCache(bucket.getPushMsgId(), false);
        if (null == message) {
            log.error("\u63a5\u53d7\u5230\u7684\u63a8\u9001\u6d88\u606f\u4e0d\u5b58\u5728\uff0c\u63a5\u53d7\u4fe1\u606f\u4e3a\uff1a{}", (Object)bucket);
            return;
        }
        if (Objects.equals(bucket.getIndex(), bucket.getTotal())) {
            if (log.isDebugEnabled()) {
                log.debug("\u6d88\u606f[{}]\u5df2\u7ecf\u5904\u7406\u6240\u6709\u5206\u7247\uff0c\u53d8\u66f4\u4e3a\u66f4\u65b0\u5b8c\u6210\u72b6\u6001", (Object)bucket.getPushMsgId());
            }
            this.messageGroupService.changeStatus(message.getPayloadInfo().getMsgId(), PushMessageStatusEnum.HAS_BEEN_SEND);
        }
        log.info("\u5904\u7406\u6279\u91cf\u63a8\u9001\u6d88\u606f\uff0c\u6d88\u606f\u5185\u5bb9\uff1a{} - {},\u63a8\u9001\u4eba\u6570\uff1a{}", new Object[]{bucket.getPushMsgId(), message.getTitle(), bucket.getTargetUserIds().size()});
        bucket.getTargetUserIds().forEach(userId -> {
            this.delay();
            this.executor.execute(() -> {
                PushMessage cloneMessage = PushMessage.build();
                BeanUtils.copyProperties((Object)message, (Object)cloneMessage);
                this.pushExecutor.push(cloneMessage.assign(userId));
            });
        });
    }

    private void initThreadPool() {
        this.executor = new ThreadPoolTaskExecutor();
        this.executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 2);
        this.executor.setQueueCapacity(10000);
        this.executor.setMaxPoolSize(30);
        this.executor.setRejectedExecutionHandler((RejectedExecutionHandler)new ThreadPoolExecutor.AbortPolicy());
        this.executor.setThreadFactory((ThreadFactory)new NamedThreadFactory("batch-push"));
        this.executor.initialize();
    }

    public void afterPropertiesSet() {
        this.initThreadPool();
    }
}

