/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.warcar.integration.distributed.delayed;

import com.bxm.warcar.integration.distributed.delayed.DelayedTask;
import com.bxm.warcar.integration.distributed.delayed.DelayedTaskExecutor;
import com.bxm.warcar.utils.LifeCycle;
import com.bxm.warcar.utils.NamedThreadFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.codec.digest.DigestUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBucket;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LatestOnlyDelayedTaskQueueTool<V extends Serializable>
extends LifeCycle {
    private static final Logger log = LoggerFactory.getLogger(LatestOnlyDelayedTaskQueueTool.class);
    private final RedissonClient redissonClient;
    private final RBlockingQueue<String> blockingQueue;
    private final RDelayedQueue<String> delayedQueue;
    private final ThreadPoolExecutor consumerThread;
    private final Class<V> clazz;
    private final DelayedTaskExecutor<V> executor;
    private final String taskName;
    private final Function<V, Long> latestExecuteTimeProvider;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Duration timeToLive;

    public LatestOnlyDelayedTaskQueueTool(RedissonClient redissonClient, String taskName, Class<V> clazz, DelayedTaskExecutor<V> executor, Function<V, Long> latestExecuteTimeProvider) {
        this(redissonClient, taskName, clazz, executor, latestExecuteTimeProvider, Duration.ofDays(1L));
    }

    public LatestOnlyDelayedTaskQueueTool(RedissonClient redissonClient, String taskName, Class<V> clazz, DelayedTaskExecutor<V> executor, Function<V, Long> latestExecuteTimeProvider, Duration timeToLive) {
        this.redissonClient = redissonClient;
        this.clazz = clazz;
        this.taskName = taskName;
        this.blockingQueue = redissonClient.getBlockingQueue(taskName);
        this.delayedQueue = redissonClient.getDelayedQueue(this.blockingQueue);
        this.consumerThread = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory(String.format("delayed-%s-queue-tool", taskName)));
        this.executor = executor;
        this.latestExecuteTimeProvider = latestExecuteTimeProvider;
        this.timeToLive = timeToLive;
    }

    public void offer(DelayedTask<V> delayedTask) {
        try {
            Long executeTimeInMillis = delayedTask.getExecuteTimeInMillis();
            long delay = executeTimeInMillis - System.currentTimeMillis();
            if (delay < 0L) {
                log.warn("delayed task execute time is in the past, task: {}", delayedTask);
                return;
            }
            String serialize = this.serialize(delayedTask);
            this.delayedQueue.offer((Object)serialize, delay, TimeUnit.MILLISECONDS);
        }
        catch (JsonProcessingException e) {
            log.error("serialize delayed task error, task: {}", delayedTask, (Object)e);
        }
    }

    public void clear() {
        this.delayedQueue.clear();
        this.blockingQueue.clear();
        log.warn("{} delayed queue tool cleared", (Object)this.taskName);
    }

    public int size() {
        return this.delayedQueue.size();
    }

    private String serialize(DelayedTask<V> delayedTask) throws JsonProcessingException {
        return this.objectMapper.writeValueAsString(delayedTask);
    }

    private DelayedTask<V> deserialize(String value) throws JsonProcessingException {
        JavaType javaType = this.objectMapper.getTypeFactory().constructParametricType(DelayedTask.class, new Class[]{this.clazz});
        return (DelayedTask)this.objectMapper.readValue(value, javaType);
    }

    protected void doInit() {
        if (null == this.executor) {
            log.info("delayed {} queue tool executor is null", (Object)this.taskName);
            return;
        }
        this.consumerThread.execute(() -> {
            while (true) {
                try {
                    while (true) {
                        String value = (String)this.blockingQueue.take();
                        DelayedTask<V> delayedTask = this.deserialize(value);
                        Long latestExecuteTime = this.latestExecuteTimeProvider.apply(delayedTask.getValue());
                        Long taskExecuteTime = delayedTask.getExecuteTimeInMillis();
                        if (null == latestExecuteTime || latestExecuteTime.equals(taskExecuteTime)) {
                            if (this.isFirstExecute(delayedTask)) {
                                log.info("{} DelayedTask [{}] will be executed, taskExecuteTime: {}, currentExecuteTime: {}", new Object[]{this.taskName, delayedTask.getValue(), taskExecuteTime, latestExecuteTime});
                                this.executor.accept(delayedTask);
                                continue;
                            }
                            log.debug("{} DelayedTask [{}] already executed, skip, taskExecuteTime: {}, but currentExecuteTime is: {}", new Object[]{this.taskName, delayedTask.getValue(), taskExecuteTime, latestExecuteTime});
                            continue;
                        }
                        log.debug("{} DelayedTask [{}] was expired, taskExecuteTime: {}, but currentExecuteTime is: {}", new Object[]{this.taskName, delayedTask.getValue(), taskExecuteTime, latestExecuteTime});
                        this.executor.reject(delayedTask);
                    }
                }
                catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    log.error("delayed {} queue tool consume error", (Object)this.taskName, (Object)e);
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                    }
                    continue;
                }
                break;
            }
        });
        log.info("delayed {} queue tool init success", (Object)this.taskName);
    }

    protected void doDestroy() {
        this.consumerThread.shutdownNow();
        log.info("delayed {} queue tool shutdown success", (Object)this.taskName);
    }

    private boolean isFirstExecute(DelayedTask<V> delayedTask) {
        try {
            String taskKey = DigestUtils.md5Hex((String)this.objectMapper.writeValueAsString(delayedTask.getValue()));
            String key = String.format("warcar:delayed_queue:execute:%s:%s:%d", this.taskName, taskKey, delayedTask.getExecuteTimeInMillis());
            RBucket bucket = this.redissonClient.getBucket(key);
            long seconds = this.timeToLive.getSeconds();
            return bucket.trySet((Object)1, seconds, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("generate execute flag error, fallback execute task: {}", delayedTask, (Object)e);
            return true;
        }
    }
}

