/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.warcar.utils.batch;

import com.bxm.warcar.utils.NamedThreadFactory;
import com.bxm.warcar.utils.batch.BatchQueue;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class BatchBlockingQueue<T>
implements BatchQueue<T> {
    private static final int DEFAULT_INTERVAL_TIME = 1000;
    private final int batchSize;
    private final Consumer<List<T>> consumer;
    private final int intervalTimeInMillis;
    private final AtomicBoolean process = new AtomicBoolean(false);
    private final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
    private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
    private final ExecutorService purgeThread = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("purge"));

    public BatchBlockingQueue(int batchSize, Consumer<List<T>> consumer) {
        this(batchSize, consumer, 1000);
    }

    public BatchBlockingQueue(int batchSize, Consumer<List<T>> consumer, int intervalTimeInMillis) {
        this.batchSize = batchSize;
        this.consumer = consumer;
        this.intervalTimeInMillis = intervalTimeInMillis;
    }

    private boolean isProcessing() {
        return this.process.get();
    }

    @Override
    public boolean add(T t) {
        boolean b = this.queue.add(t);
        if (!this.isProcessing() && b) {
            this.process.set(true);
            this.startDrain();
        }
        return b;
    }

    private void startDrain() {
        this.purgeThread.submit(() -> {
            this.startTime.set(System.currentTimeMillis());
            while (true) {
                long last = System.currentTimeMillis() - this.startTime.get();
                if (this.queue.size() >= this.batchSize || !this.queue.isEmpty() && last > (long)this.intervalTimeInMillis) {
                    this.doDrainWithBatchSize();
                    continue;
                }
                if (this.queue.isEmpty()) break;
            }
            this.process.set(false);
        });
    }

    @Override
    public void drainAll() {
        while (!this.queue.isEmpty()) {
            this.doDrainWithBatchSize();
        }
    }

    @Override
    public int size() {
        return this.queue.size();
    }

    private void doDrainWithBatchSize() {
        ArrayList drained = Lists.newArrayList();
        if (this.queue.drainTo(drained, this.batchSize) > 0) {
            this.consumer.accept(drained);
            this.startTime.set(System.currentTimeMillis());
        }
    }
}

