package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.util.Preconditions;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.class */
public class MailboxImpl implements Mailbox {

    @GuardedBy("lock")
    private final Runnable[] ringBuffer;
    private final ReentrantLock lock;

    @GuardedBy("lock")
    private final Condition notEmpty;

    @GuardedBy("lock")
    private final Condition notFull;

    @GuardedBy("lock")
    private int headIndex;

    @GuardedBy("lock")
    private int tailIndex;

    @GuardedBy("lock")
    private volatile int count;
    private final int moduloMask;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MailboxImpl() {
        this(6);
    }

    public MailboxImpl(int i) {
        int i2 = 1 << i;
        Preconditions.checkState(i2 > 0);
        this.moduloMask = i2 - 1;
        this.ringBuffer = new Runnable[i2];
        this.lock = new ReentrantLock();
        this.notEmpty = this.lock.newCondition();
        this.notFull = this.lock.newCondition();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxReceiver
    public boolean hasMail() {
        return !isEmpty();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxReceiver
    public Optional<Runnable> tryTakeMail() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            return isEmpty() ? Optional.empty() : Optional.of(takeInternal());
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxReceiver
    @Nonnull
    public Runnable takeMail() throws InterruptedException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (isEmpty()) {
            try {
                this.notEmpty.await();
            } finally {
                reentrantLock.unlock();
            }
        }
        return takeInternal();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxReceiver
    public void waitUntilHasMail() throws InterruptedException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (isEmpty()) {
            try {
                this.notEmpty.await();
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxSender
    public boolean tryPutMail(@Nonnull Runnable runnable) {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (isFull()) {
                return false;
            }
            putInternal(runnable);
            reentrantLock.unlock();
            return true;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxSender
    public void putMail(@Nonnull Runnable runnable) throws InterruptedException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (isFull()) {
            try {
                this.notFull.await();
            } finally {
                reentrantLock.unlock();
            }
        }
        putInternal(runnable);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxSender
    public void waitUntilHasCapacity() throws InterruptedException {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (isFull()) {
            try {
                this.notFull.await();
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    private void putInternal(Runnable runnable) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        this.ringBuffer[this.tailIndex] = runnable;
        this.tailIndex = increaseIndexWithWrapAround(this.tailIndex);
        this.count++;
        this.notEmpty.signal();
    }

    private Runnable takeInternal() {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        Runnable[] runnableArr = this.ringBuffer;
        Runnable runnable = runnableArr[this.headIndex];
        runnableArr[this.headIndex] = null;
        this.headIndex = increaseIndexWithWrapAround(this.headIndex);
        this.count--;
        this.notFull.signal();
        return runnable;
    }

    private int increaseIndexWithWrapAround(int i) {
        return (i + 1) & this.moduloMask;
    }

    private boolean isFull() {
        return this.count >= this.ringBuffer.length;
    }

    private boolean isEmpty() {
        return this.count == 0;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox
    public void clearAndPut(@Nonnull Runnable runnable) {
        this.lock.lock();
        try {
            for (int i = this.count; i > 0; i--) {
                this.ringBuffer[this.headIndex] = null;
                this.headIndex = increaseIndexWithWrapAround(this.headIndex);
            }
            this.count = 0;
            putInternal(runnable);
        } finally {
            this.lock.unlock();
        }
    }

    static {
        $assertionsDisabled = !MailboxImpl.class.desiredAssertionStatus();
    }
}
