package com.bxm.localnews.msg.strategy.impl;

import com.bxm.localnews.mq.common.constant.PushReceiverRuleEnum;
import com.bxm.localnews.mq.common.model.dto.PushMessage;
import com.bxm.localnews.mq.common.model.dto.PushReceiveScope;
import com.bxm.localnews.mq.common.param.UserSearchPageParam;
import com.bxm.localnews.msg.config.PushMessageStatusEnum;
import com.bxm.localnews.msg.integration.UserIntegrationService;
import com.bxm.newidea.component.thread.NamedThreadFactory;
import com.bxm.newidea.component.vo.PageWarper;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/bxm/localnews/msg/strategy/impl/ConditionPushDistributeStrategy.class */
public class ConditionPushDistributeStrategy extends AbstractPushDistributeStrategy {
    private static final Logger log = LoggerFactory.getLogger(ConditionPushDistributeStrategy.class);

    @Resource
    private UserIntegrationService userIntegrationService;
    private ThreadPoolTaskExecutor executor;

    @Override // com.bxm.localnews.msg.strategy.IPushDistributeStrategy
    public PushReceiverRuleEnum support() {
        return PushReceiverRuleEnum.CONDITION;
    }

    @Override // com.bxm.localnews.msg.strategy.IPushDistributeStrategy
    public void send(PushMessage pushMessage) {
        execPreprocess(pushMessage);
        PageWarper<Long> queryAndDeal = queryAndDeal(pushMessage, 1);
        if (queryAndDeal.isHasNextPage()) {
            if (null == this.executor) {
                initThreadPool();
            }
            AtomicInteger atomicInteger = new AtomicInteger(1);
            for (int i = 0; i < queryAndDeal.getPages() - 1; i++) {
                this.executor.execute(() -> {
                    queryAndDeal(pushMessage, atomicInteger.incrementAndGet());
                });
            }
        }
    }

    private PageWarper<Long> queryAndDeal(PushMessage pushMessage, int i) {
        PushReceiveScope pushReceiveScope = pushMessage.getPushReceiveScope();
        UserSearchPageParam build = UserSearchPageParam.builder().pushReceiverRule(pushReceiveScope.getPushReceiverRule()).ruleParam(pushReceiveScope.getRuleParam()).build();
        int bucketNum = this.appPushProperties.getBucketNum() * this.appPushProperties.getPullMultiple();
        build.setPageSize(Integer.valueOf(bucketNum));
        build.setPageNum(Integer.valueOf(i));
        PageWarper<Long> queryUserByPage = this.userIntegrationService.queryUserByPage(build);
        List<Long> list = queryUserByPage.getList();
        int size = CollectionUtils.isEmpty(list) ? 0 : list.size();
        if (log.isDebugEnabled()) {
            log.debug("分页获取推送目标数据，每页数量:{}，当前页:{}，获取用户数量：{}", new Object[]{Integer.valueOf(bucketNum), Integer.valueOf(i), Integer.valueOf(size)});
        }
        if (size > 0) {
            splitTask(list, pushMessage.getMsgId(), i, queryUserByPage.getPages());
        }
        if (size < bucketNum) {
            getMessageGroupService().changeStatus(pushMessage.getPayloadInfo().getMsgId(), PushMessageStatusEnum.HAS_BEEN_SEND);
        }
        return queryUserByPage;
    }

    private void initThreadPool() {
        this.executor = new ThreadPoolTaskExecutor();
        this.executor.setCorePoolSize(2);
        this.executor.setQueueCapacity(10000);
        this.executor.setMaxPoolSize(10);
        this.executor.setKeepAliveSeconds(10);
        this.executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        this.executor.setThreadFactory(new NamedThreadFactory("batch-page-push"));
        this.executor.initialize();
    }
}
