/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
import org.apache.flink.util.Preconditions;

@ThreadSafe
public class MailboxImpl
implements Mailbox {
    @GuardedBy(value="lock")
    private final Runnable[] ringBuffer;
    private final ReentrantLock lock;
    @GuardedBy(value="lock")
    private final Condition notEmpty;
    @GuardedBy(value="lock")
    private final Condition notFull;
    @GuardedBy(value="lock")
    private int headIndex;
    @GuardedBy(value="lock")
    private int tailIndex;
    @GuardedBy(value="lock")
    private volatile int count;
    private final int moduloMask;

    public MailboxImpl() {
        this(6);
    }

    public MailboxImpl(int capacityPow2) {
        int capacity = 1 << capacityPow2;
        Preconditions.checkState((capacity > 0 ? 1 : 0) != 0);
        this.moduloMask = capacity - 1;
        this.ringBuffer = new Runnable[capacity];
        this.lock = new ReentrantLock();
        this.notEmpty = this.lock.newCondition();
        this.notFull = this.lock.newCondition();
    }

    @Override
    public boolean hasMail() {
        return !this.isEmpty();
    }

    @Override
    public Optional<Runnable> tryTakeMail() {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Optional<Runnable> optional = this.isEmpty() ? Optional.empty() : Optional.of(this.takeInternal());
            return optional;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    @Nonnull
    public Runnable takeMail() throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (this.isEmpty()) {
                this.notEmpty.await();
            }
            Runnable runnable = this.takeInternal();
            return runnable;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void waitUntilHasMail() throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (this.isEmpty()) {
                this.notEmpty.await();
            }
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean tryPutMail(@Nonnull Runnable letter) {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (this.isFull()) {
                boolean bl = false;
                return bl;
            }
            this.putInternal(letter);
            boolean bl = true;
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void putMail(@Nonnull Runnable letter) throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (this.isFull()) {
                this.notFull.await();
            }
            this.putInternal(letter);
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void waitUntilHasCapacity() throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (this.isFull()) {
                this.notFull.await();
            }
        }
        finally {
            lock.unlock();
        }
    }

    private void putInternal(Runnable letter) {
        assert (this.lock.isHeldByCurrentThread());
        this.ringBuffer[this.tailIndex] = letter;
        this.tailIndex = this.increaseIndexWithWrapAround(this.tailIndex);
        ++this.count;
        this.notEmpty.signal();
    }

    private Runnable takeInternal() {
        assert (this.lock.isHeldByCurrentThread());
        Runnable[] buffer = this.ringBuffer;
        Runnable letter = buffer[this.headIndex];
        buffer[this.headIndex] = null;
        this.headIndex = this.increaseIndexWithWrapAround(this.headIndex);
        --this.count;
        this.notFull.signal();
        return letter;
    }

    private int increaseIndexWithWrapAround(int old) {
        return old + 1 & this.moduloMask;
    }

    private boolean isFull() {
        return this.count >= this.ringBuffer.length;
    }

    private boolean isEmpty() {
        return this.count == 0;
    }

    @Override
    public void clearAndPut(@Nonnull Runnable shutdownAction) {
        this.lock.lock();
        try {
            for (int localCount = this.count; localCount > 0; --localCount) {
                this.ringBuffer[this.headIndex] = null;
                this.headIndex = this.increaseIndexWithWrapAround(this.headIndex);
            }
            this.count = 0;
            this.putInternal(shutdownAction);
        }
        finally {
            this.lock.unlock();
        }
    }
}

