package com.aliyun.openservices.ons.api.impl.rocketmq;

import com.aliyun.openservices.ons.api.Constants;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.impl.util.ClientLoggerUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultLitePullConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/PullConsumerImpl.class */
public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer {
    private static final InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
    private static final int MAX_CACHED_MESSAGE_SIZE_IN_MIB = 1024;
    private static final int MIN_CACHED_MESSAGE_SIZE_IN_MIB = 16;
    private static final int MAX_CACHED_MESSAGE_AMOUNT = 50000;
    private static final int MIN_CACHED_MESSAGE_AMOUNT = 100;
    private DefaultLitePullConsumer litePullConsumer;
    private int maxCachedMessageSizeInMiB;
    private int maxCachedMessageAmount;
    private long minAutoCommitIntervalMillis;

    public PullConsumerImpl(Properties properties) {
        super(properties);
        this.maxCachedMessageSizeInMiB = 512;
        this.maxCachedMessageAmount = 5000;
        this.minAutoCommitIntervalMillis = 1000L;
        String property = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID));
        if (StringUtils.isEmpty(property)) {
            throw new ONSClientException("Unable to get GROUP_ID property");
        }
        this.litePullConsumer = new DefaultLitePullConsumer(getNamespace(), property, new OnsClientRPCHook(this.sessionCredentials));
        this.litePullConsumer.setMessageModel(MessageModel.valueOf(properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING)));
        String property2 = properties.getProperty(PropertyKeyConst.MAX_BATCH_MESSAGE_COUNT);
        if (!UtilAll.isBlank(property2)) {
            this.litePullConsumer.setPullBatchSize(Integer.valueOf(StringUtils.trim(property2)).intValue());
        }
        this.litePullConsumer.setVipChannelEnabled(Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")));
        this.litePullConsumer.setInstanceName(properties.getProperty("InstanceName", buildIntanceName()));
        this.litePullConsumer.setNamesrvAddr(getNameServerAddr());
        String property3 = properties.getProperty(PropertyKeyConst.ConsumeThreadNums);
        if (!UtilAll.isBlank(property3)) {
            this.litePullConsumer.setPullThreadNums(Integer.valueOf(StringUtils.trim(property3)).intValue());
        }
        String property4 = properties.getProperty(PropertyKeyConst.MaxCachedMessageAmount);
        if (!UtilAll.isBlank(property4)) {
            this.maxCachedMessageAmount = Math.min(MAX_CACHED_MESSAGE_AMOUNT, Integer.valueOf(StringUtils.trim(property4)).intValue());
            this.maxCachedMessageAmount = Math.max(100, this.maxCachedMessageAmount);
            this.litePullConsumer.setPullThresholdForAll(this.maxCachedMessageAmount);
        }
        String property5 = properties.getProperty(PropertyKeyConst.MaxCachedMessageSizeInMiB);
        if (!UtilAll.isBlank(property5)) {
            this.maxCachedMessageSizeInMiB = Math.min(1024, Integer.valueOf(StringUtils.trim(property5)).intValue());
            this.maxCachedMessageSizeInMiB = Math.max(16, this.maxCachedMessageSizeInMiB);
            this.litePullConsumer.setPullThresholdSizeForQueue(this.maxCachedMessageSizeInMiB);
        }
        String property6 = properties.getProperty(PropertyKeyConst.AUTO_COMMIT);
        if (!UtilAll.isBlank(property6)) {
            this.litePullConsumer.setAutoCommit(Boolean.valueOf(property6).booleanValue());
        }
        String property7 = properties.getProperty(PropertyKeyConst.AUTO_COMMIT_INTERVAL_MILLIS);
        if (!StringUtils.isBlank(property7)) {
            this.litePullConsumer.setAutoCommitIntervalMillis(Math.max(this.minAutoCommitIntervalMillis, Long.valueOf(StringUtils.trim(property7)).longValue()));
        }
        String property8 = properties.getProperty(PropertyKeyConst.POLL_TIMEOUT_MILLIS);
        if (StringUtils.isBlank(property8)) {
            return;
        }
        this.litePullConsumer.setPollTimeoutMillis(Long.valueOf(StringUtils.trim(property8)).longValue());
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract
    protected void updateNameServerAddr(String str) {
        this.litePullConsumer.updateNameServerAddress(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<TopicPartition> convertToTopicPartitions(Collection<MessageQueue> collection) {
        HashSet hashSet = new HashSet();
        Iterator<MessageQueue> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(convertToTopicPartition(it.next()));
        }
        return hashSet;
    }

    private Set<MessageQueue> convertToMessageQueues(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(convertToMessageQueue(it.next()));
        }
        return hashSet;
    }

    private TopicPartition convertToTopicPartition(MessageQueue messageQueue) {
        return new TopicPartition(messageQueue.getTopic(), messageQueue.getBrokerName() + "#" + messageQueue.getQueueId());
    }

    private MessageQueue convertToMessageQueue(TopicPartition topicPartition) {
        String topic = topicPartition.getTopic();
        String[] split = topicPartition.getPartition().split("#");
        if (split.length == 2) {
            return new MessageQueue(topic, split[0], Integer.valueOf(split[1]).intValue());
        }
        LOGGER.warn("Failed to get message queue from TopicPartition: {}", topicPartition);
        throw new ONSClientException("Failed to get message queue");
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public Set<TopicPartition> topicPartitions(String str) {
        try {
            Collection<MessageQueue> fetchMessageQueues = this.litePullConsumer.fetchMessageQueues(str);
            HashSet hashSet = new HashSet();
            Iterator<MessageQueue> it = fetchMessageQueues.iterator();
            while (it.hasNext()) {
                hashSet.add(convertToTopicPartition(it.next()));
            }
            return hashSet;
        } catch (MQClientException e) {
            LOGGER.warn("Failed to fetch topic partitions", (Throwable) e);
            throw new ONSClientException("Failed to fetch topic partitions", e);
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void assign(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(convertToMessageQueue(it.next()));
        }
        this.litePullConsumer.assign(hashSet);
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void registerTopicPartitionChangedListener(String str, final PullConsumer.TopicPartitionChangeListener topicPartitionChangeListener) {
        try {
            this.litePullConsumer.registerTopicMessageQueueChangeListener(str, new TopicMessageQueueChangeListener() { // from class: com.aliyun.openservices.ons.api.impl.rocketmq.PullConsumerImpl.1
                @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.TopicMessageQueueChangeListener
                public void onChanged(String str2, Set<MessageQueue> set) {
                    topicPartitionChangeListener.onChanged(PullConsumerImpl.this.convertToTopicPartitions(set));
                }
            });
        } catch (MQClientException e) {
            LOGGER.warn("Register listener error", (Throwable) e);
            throw new ONSClientException("Failed to register topic partition listener");
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public List<Message> poll(long j) {
        List<MessageExt> poll = this.litePullConsumer.poll(j);
        ArrayList arrayList = new ArrayList();
        for (MessageExt messageExt : poll) {
            Message msgConvert = ONSUtil.msgConvert(messageExt);
            Map<String, String> properties = messageExt.getProperties();
            msgConvert.setMsgID(messageExt.getMsgId());
            if (properties != null && properties.get(Constants.TRANSACTION_ID) != null) {
                msgConvert.setMsgID(properties.get(Constants.TRANSACTION_ID));
            }
            arrayList.add(msgConvert);
        }
        return arrayList;
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void seek(TopicPartition topicPartition, long j) {
        try {
            this.litePullConsumer.seek(convertToMessageQueue(topicPartition), j);
        } catch (MQClientException e) {
            LOGGER.warn("Topic partition: {} seek to: {} error", topicPartition, Long.valueOf(j), e);
            throw new ONSClientException("Seek offset failed");
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void seekToBeginning(TopicPartition topicPartition) {
        try {
            this.litePullConsumer.seekToBegin(convertToMessageQueue(topicPartition));
        } catch (MQClientException e) {
            LOGGER.warn("Topic partition: {} seek to beginning error", topicPartition, e);
            throw new ONSClientException("Seek offset to beginning failed");
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void seekToEnd(TopicPartition topicPartition) {
        try {
            this.litePullConsumer.seekToEnd(convertToMessageQueue(topicPartition));
        } catch (MQClientException e) {
            LOGGER.warn("Topic partition: {} seek to end error", topicPartition, e);
            throw new ONSClientException("Seek offset to end failed");
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void pause(Collection<TopicPartition> collection) {
        this.litePullConsumer.pause(convertToMessageQueues(collection));
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void resume(Collection<TopicPartition> collection) {
        this.litePullConsumer.resume(convertToMessageQueues(collection));
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public Long offsetForTimestamp(TopicPartition topicPartition, Long l) {
        try {
            return this.litePullConsumer.offsetForTimestamp(convertToMessageQueue(topicPartition), l);
        } catch (MQClientException e) {
            LOGGER.warn("Get offset for topic partition:{} with timestamp:{} error", topicPartition, l, e);
            throw new ONSClientException("Failed to get offset");
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public Long committed(TopicPartition topicPartition) {
        try {
            return this.litePullConsumer.committed(convertToMessageQueue(topicPartition));
        } catch (MQClientException e) {
            LOGGER.warn("Get committed offset for topic partition: {} error", topicPartition, e);
            throw new ONSClientException("Failed to get committed offset");
        }
    }

    @Override // com.aliyun.openservices.ons.api.PullConsumer
    public void commitSync() {
        this.litePullConsumer.commitSync();
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public void start() {
        try {
            if (this.started.compareAndSet(false, true)) {
                this.litePullConsumer.start();
                super.start();
            }
        } catch (Exception e) {
            LOGGER.warn("Failed to start pull consumer", (Throwable) e);
            throw new ONSClientException(e.getMessage());
        }
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSClientAbstract, com.aliyun.openservices.ons.api.Admin
    public void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            this.litePullConsumer.shutdown();
        }
        super.shutdown();
    }
}
