package org.apache.rocketmq.spark.streaming;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spark.RocketMQConfig;
import org.apache.spark.storage.StorageLevel;

/* loaded from: input_file:org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiver.class */
public class ReliableRocketMQReceiver extends RocketMQReceiver {
    private BlockingQueue<MessageSet> queue;
    private MessageRetryManager messageRetryManager;
    private MessageSender sender;

    /* loaded from: input_file:org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiver$MessageSender.class */
    class MessageSender extends Thread {
        MessageSender() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (ReliableRocketMQReceiver.this.isStarted()) {
                try {
                    MessageSet messageSet = (MessageSet) ReliableRocketMQReceiver.this.queue.take();
                    if (messageSet != null) {
                        ReliableRocketMQReceiver.this.messageRetryManager.mark(messageSet);
                        try {
                            ReliableRocketMQReceiver.this.store(messageSet);
                            ReliableRocketMQReceiver.this.ack(messageSet.getId());
                        } catch (Exception e) {
                            ReliableRocketMQReceiver.this.fail(messageSet.getId());
                        }
                    }
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    public ReliableRocketMQReceiver(Properties properties, StorageLevel storageLevel) {
        super(properties, storageLevel);
    }

    @Override // org.apache.rocketmq.spark.streaming.RocketMQReceiver
    public void onStart() {
        this.queue = new LinkedBlockingQueue(RocketMQConfig.getInteger(this.properties, RocketMQConfig.QUEUE_SIZE, RocketMQConfig.DEFAULT_QUEUE_SIZE));
        this.messageRetryManager = new DefaultMessageRetryManager(this.queue, RocketMQConfig.getInteger(this.properties, RocketMQConfig.MESSAGES_MAX_RETRY, 3), RocketMQConfig.getInteger(this.properties, RocketMQConfig.MESSAGES_TTL, RocketMQConfig.DEFAULT_MESSAGES_TTL));
        this.sender = new MessageSender();
        this.sender.setName("MessageSender");
        this.sender.setDaemon(true);
        this.sender.start();
        super.onStart();
    }

    @Override // org.apache.rocketmq.spark.streaming.RocketMQReceiver
    public boolean process(List<MessageExt> list) {
        if (list.isEmpty()) {
            return true;
        }
        try {
            this.queue.put(new MessageSet(list));
            return true;
        } catch (InterruptedException e) {
            return false;
        }
    }

    public void ack(Object obj) {
        this.messageRetryManager.ack(obj.toString());
    }

    public void fail(Object obj) {
        this.messageRetryManager.fail(obj.toString());
    }

    @Override // org.apache.rocketmq.spark.streaming.RocketMQReceiver
    public void onStop() {
        this.consumer.shutdown();
    }
}
