package com.bxm.localnews.im.chat.impl;

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import com.bxm.component.mybatis.utils.MybatisBatchBuilder;
import com.bxm.egg.common.param.BaseUserParam;
import com.bxm.localnews.im.chat.ConsumeService;
import com.bxm.localnews.im.constant.ImRedisKey;
import com.bxm.localnews.im.domain.ChatMessageMapper;
import com.bxm.localnews.im.enums.ChannelTypeEnum;
import com.bxm.localnews.im.param.RongCloudParam;
import com.bxm.localnews.im.thirdpart.MsgContentProcesser;
import com.bxm.localnews.im.thirdpart.RCProcessorFactory;
import com.bxm.localnews.im.thirdpart.enums.MsgTypeEnum;
import com.bxm.localnews.im.vo.IMMessageBean;
import com.bxm.newidea.component.redis.KeyGenerator;
import com.bxm.newidea.component.redis.RedisSetAdapter;
import com.bxm.newidea.component.thread.NamedThreadFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/bxm/localnews/im/chat/impl/ConsumeServiceImpl.class */
public class ConsumeServiceImpl implements ConsumeService, ApplicationRunner {
    private static final Logger log = LoggerFactory.getLogger(ConsumeServiceImpl.class);
    private final RedisSetAdapter redisSetAdapter;
    private static final long FETCH_NUM = 500;
    private ReentrantLock lock;
    private Condition consumeCondition;

    @Autowired
    public ConsumeServiceImpl(RedisSetAdapter redisSetAdapter) {
        this.redisSetAdapter = redisSetAdapter;
    }

    @Override // com.bxm.localnews.im.chat.ConsumeService
    public void write(RongCloudParam rongCloudParam) {
        if (log.isDebugEnabled()) {
            log.debug("新增一条通讯消息：[{}]", rongCloudParam);
        }
        this.redisSetAdapter.add(ImRedisKey.MSG_SET, new Object[]{rongCloudParam});
        if (this.lock.tryLock()) {
            this.consumeCondition.signal();
            this.lock.unlock();
        }
    }

    private KeyGenerator buildHistoryKey(Long l, Date date) {
        return ImRedisKey.HAS_HISTORY_KEY.copy().appendKey(Long.valueOf(l.longValue() % 10)).appendKey(DatePattern.PURE_DATE_FORMAT.format(date));
    }

    @Override // com.bxm.localnews.im.chat.ConsumeService
    public boolean hasHistoryMsg(BaseUserParam baseUserParam) {
        Long userId = baseUserParam.getUserId();
        Preconditions.checkNotNull(userId);
        if (this.redisSetAdapter.exists(buildHistoryKey(userId, DateUtil.date()), userId).booleanValue()) {
            return true;
        }
        return this.redisSetAdapter.exists(buildHistoryKey(userId, DateUtil.yesterday()), userId).booleanValue();
    }

    @Override // com.bxm.localnews.im.chat.ConsumeService
    public void removeHistoryCache(Date date) {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            this.redisSetAdapter.remove(buildHistoryKey(Long.valueOf(j2), date));
            j = j2 + 1;
        }
    }

    private void addUserHistory(Long l) {
        if (l.longValue() > 1000000000) {
            return;
        }
        this.redisSetAdapter.add(buildHistoryKey(l, DateUtil.date()), new Object[]{l});
    }

    private void consume() {
        while (true) {
            this.lock.lock();
            try {
                try {
                    List<RongCloudParam> pop = this.redisSetAdapter.pop(ImRedisKey.MSG_SET, Long.valueOf(FETCH_NUM), RongCloudParam.class);
                    if (pop.size() > 0) {
                        List<IMMessageBean> convert = convert(pop);
                        batchInsertPersonMessage(convert);
                        batchInsertGroupMessage(convert);
                        if (log.isDebugEnabled()) {
                            log.debug("消息处理完成，消息数量：[{}]", Integer.valueOf(pop.size()));
                        }
                    } else {
                        this.consumeCondition.await();
                    }
                    this.lock.unlock();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    this.lock.unlock();
                }
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
    }

    private void batchInsertGroupMessage(List<IMMessageBean> list) {
        MybatisBatchBuilder.create(ChatMessageMapper.class, (List) list.stream().filter(iMMessageBean -> {
            return StringUtils.equals(ChannelTypeEnum.GROUP.getTypeName(), iMMessageBean.getChannelType());
        }).collect(Collectors.toList())).run((v0, v1) -> {
            return v0.insertGroupMessage(v1);
        });
    }

    private void batchInsertPersonMessage(List<IMMessageBean> list) {
        List<IMMessageBean> list2 = (List) list.stream().filter(iMMessageBean -> {
            return StringUtils.equals(ChannelTypeEnum.PERSON.getTypeName(), iMMessageBean.getChannelType());
        }).collect(Collectors.toList());
        for (IMMessageBean iMMessageBean2 : list2) {
            if (null != iMMessageBean2.getToUserId()) {
                addUserHistory(iMMessageBean2.getToUserId());
            }
        }
        MybatisBatchBuilder.create(ChatMessageMapper.class, list2).run((v0, v1) -> {
            return v0.insert(v1);
        });
    }

    private List<IMMessageBean> convert(List<RongCloudParam> list) {
        ArrayList newArrayList = Lists.newArrayList();
        for (RongCloudParam rongCloudParam : list) {
            MsgContentProcesser msgContentProcesser = RCProcessorFactory.get(rongCloudParam.getObjectName());
            if (MsgTypeEnum.CONTENT.equals(msgContentProcesser.msgType())) {
                IMMessageBean build = build(rongCloudParam, msgContentProcesser);
                msgContentProcesser.callback(build);
                newArrayList.add(build);
            } else if (log.isDebugEnabled()) {
                log.debug("不支持的消息类型：{}", msgContentProcesser.msgType());
            }
        }
        return newArrayList;
    }

    private IMMessageBean build(RongCloudParam rongCloudParam, MsgContentProcesser msgContentProcesser) {
        String parseBrief = msgContentProcesser.parseBrief(rongCloudParam.getContent());
        IMMessageBean build = IMMessageBean.builder().msgId(rongCloudParam.getMsgUID()).content(rongCloudParam.getContent()).briefContent(parseBrief).parseContent(msgContentProcesser.parse(rongCloudParam.getContent())).channelType(rongCloudParam.getChannelType()).groupUserIds(StringUtils.join(rongCloudParam.getGroupUserIds(), ",")).sensitiveType(rongCloudParam.getSensitiveType()).source(rongCloudParam.getSource()).msgType(rongCloudParam.getObjectName()).build();
        if (NumberUtil.isNumber(rongCloudParam.getMsgTimestamp())) {
            build.setMsgTimestamp(new Date(Long.valueOf(rongCloudParam.getMsgTimestamp()).longValue()));
        }
        if (NumberUtil.isNumber(rongCloudParam.getFromUserId())) {
            build.setFromUserId(Long.valueOf(rongCloudParam.getFromUserId()));
        }
        if (NumberUtil.isNumber(rongCloudParam.getToUserId())) {
            build.setToUserId(Long.valueOf(rongCloudParam.getToUserId()));
        }
        return build;
    }

    public void run(ApplicationArguments applicationArguments) {
        this.lock = new ReentrantLock();
        this.consumeCondition = this.lock.newCondition();
        new NamedThreadFactory("consume-im-msg", true).newThread(this::consume).start();
    }
}
