/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.BufferManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

public class RemoteInputChannel
extends InputChannel {
    private final InputChannelID id = new InputChannelID();
    private final ConnectionID connectionId;
    private final ConnectionManager connectionManager;
    private final ArrayDeque<Buffer> receivedBuffers = new ArrayDeque();
    private final AtomicBoolean isReleased = new AtomicBoolean();
    private volatile PartitionRequestClient partitionRequestClient;
    private int expectedSequenceNumber = 0;
    private int initialCredit;
    private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
    @GuardedBy(value="receivedBuffers")
    private long lastRequestedCheckpointId = -1L;
    private long receivedCheckpointId = -1L;
    private final BufferManager bufferManager;

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, int initialBackOff, int maxBackoff, Counter numBytesIn, Counter numBuffersIn) {
        super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, numBytesIn, numBuffersIn);
        this.connectionId = (ConnectionID)Preconditions.checkNotNull((Object)connectionId);
        this.connectionManager = (ConnectionManager)Preconditions.checkNotNull((Object)connectionManager);
        this.bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0);
    }

    void assignExclusiveSegments() throws IOException {
        Preconditions.checkState((this.initialCredit == 0 ? 1 : 0) != 0, (Object)"Bug in input channel setup logic: exclusive buffers have already been set for this input channel.");
        this.initialCredit = this.bufferManager.requestExclusiveBuffers();
    }

    @Override
    @VisibleForTesting
    public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
        if (this.partitionRequestClient == null) {
            try {
                this.partitionRequestClient = this.connectionManager.createPartitionRequestClient(this.connectionId);
            }
            catch (IOException e) {
                throw new PartitionConnectionException(this.partitionId, (Throwable)e);
            }
            this.partitionRequestClient.requestSubpartition(this.partitionId, subpartitionIndex, this, 0);
        }
    }

    void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException {
        this.checkPartitionRequestQueueInitialized();
        if (this.increaseBackoff()) {
            this.partitionRequestClient.requestSubpartition(this.partitionId, subpartitionIndex, this, this.getCurrentBackoff());
        } else {
            this.failPartitionRequest();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        boolean moreAvailable;
        Buffer next;
        this.checkPartitionRequestQueueInitialized();
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            next = this.receivedBuffers.poll();
            moreAvailable = !this.receivedBuffers.isEmpty();
        }
        if (next == null) {
            if (this.isReleased.get()) {
                throw new CancelTaskException("Queried for a buffer after channel has been released.");
            }
            throw new IllegalStateException("There should always have queued buffers for unreleased channel.");
        }
        this.numBytesIn.inc((long)next.getSize());
        this.numBuffersIn.inc();
        return Optional.of(new InputChannel.BufferAndAvailability(next, moreAvailable, 0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException {
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            Buffer buffer;
            CheckpointBarrier checkpointBarrier;
            Preconditions.checkState((checkpointId > this.lastRequestedCheckpointId ? 1 : 0) != 0, (Object)"Need to request the next checkpointId");
            ArrayList<Buffer> inflightBuffers = new ArrayList<Buffer>(this.receivedBuffers.size());
            Iterator<Buffer> iterator = this.receivedBuffers.iterator();
            while (iterator.hasNext() && ((checkpointBarrier = this.parseCheckpointBarrierOrNull(buffer = iterator.next())) == null || checkpointBarrier.getId() < checkpointId)) {
                if (!buffer.isBuffer()) continue;
                inflightBuffers.add(buffer.retainBuffer());
            }
            this.lastRequestedCheckpointId = checkpointId;
            channelStateWriter.addInputData(checkpointId, this.channelInfo, -2, (CloseableIterator<Buffer>)CloseableIterator.fromList(inflightBuffers, Buffer::recycleBuffer));
        }
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        Preconditions.checkState((!this.isReleased.get() ? 1 : 0) != 0, (Object)"Tried to send task event to producer after channel has been released.");
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.sendTaskEvent(this.partitionId, event, this);
    }

    @Override
    public boolean isReleased() {
        return this.isReleased.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            ArrayDeque<Buffer> releasedBuffers;
            ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
            synchronized (arrayDeque) {
                releasedBuffers = new ArrayDeque<Buffer>(this.receivedBuffers);
                this.receivedBuffers.clear();
            }
            this.bufferManager.releaseAllBuffers(releasedBuffers);
            if (this.partitionRequestClient != null) {
                this.partitionRequestClient.close(this);
            } else {
                this.connectionManager.closeOpenChannelConnections(this.connectionId);
            }
        }
    }

    private void failPartitionRequest() {
        this.setError(new PartitionNotFoundException(this.partitionId));
    }

    public String toString() {
        return "RemoteInputChannel [" + this.partitionId + " at " + this.connectionId + "]";
    }

    private void notifyCreditAvailable() throws IOException {
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.notifyCreditAvailable(this);
    }

    @VisibleForTesting
    public int getNumberOfAvailableBuffers() {
        return this.bufferManager.getNumberOfAvailableBuffers();
    }

    @VisibleForTesting
    public int getNumberOfRequiredBuffers() {
        return this.bufferManager.unsynchronizedGetNumberOfRequiredBuffers();
    }

    @VisibleForTesting
    public int getSenderBacklog() {
        return this.getNumberOfRequiredBuffers() - this.initialCredit;
    }

    @VisibleForTesting
    boolean isWaitingForFloatingBuffers() {
        return this.bufferManager.unsynchronizedIsWaitingForFloatingBuffers();
    }

    @VisibleForTesting
    public Buffer getNextReceivedBuffer() {
        return this.receivedBuffers.poll();
    }

    @VisibleForTesting
    BufferManager getBufferManager() {
        return this.bufferManager;
    }

    @VisibleForTesting
    PartitionRequestClient getPartitionRequestClient() {
        return this.partitionRequestClient;
    }

    @Override
    public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
        if (numAvailableBuffers > 0 && this.unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
            this.notifyCreditAvailable();
        }
    }

    @Override
    public void resumeConsumption() throws IOException {
        Preconditions.checkState((!this.isReleased.get() ? 1 : 0) != 0, (Object)"Channel released.");
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.resumeConsumption(this);
    }

    public int getUnannouncedCredit() {
        return this.unannouncedCredit.get();
    }

    public int getAndResetUnannouncedCredit() {
        return this.unannouncedCredit.getAndSet(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfQueuedBuffers() {
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            return this.receivedBuffers.size();
        }
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(0, this.receivedBuffers.size());
    }

    public int unsynchronizedGetExclusiveBuffersUsed() {
        return Math.max(0, this.initialCredit - this.bufferManager.unsynchronizedGetExclusiveBuffersUsed());
    }

    public int unsynchronizedGetFloatingBuffersAvailable() {
        return Math.max(0, this.bufferManager.unsynchronizedGetFloatingBuffersAvailable());
    }

    public InputChannelID getInputChannelId() {
        return this.id;
    }

    public int getInitialCredit() {
        return this.initialCredit;
    }

    public BufferProvider getBufferProvider() throws IOException {
        if (this.isReleased.get()) {
            return null;
        }
        return this.inputGate.getBufferProvider();
    }

    @Nullable
    public Buffer requestBuffer() {
        return this.bufferManager.requestBuffer();
    }

    void onSenderBacklog(int backlog) throws IOException {
        int numRequestedBuffers = this.bufferManager.requestFloatingBuffers(backlog + this.initialCredit);
        if (numRequestedBuffers > 0 && this.unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
            this.notifyCreditAvailable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
        boolean recycleBuffer = true;
        try {
            CheckpointBarrier notifyReceivedBarrier;
            Buffer notifyReceivedBuffer;
            boolean wasEmpty;
            if (this.expectedSequenceNumber != sequenceNumber) {
                this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                return;
            }
            BufferReceivedListener listener = this.inputGate.getBufferReceivedListener();
            ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
            synchronized (arrayDeque) {
                block17: {
                    if (!this.isReleased.get()) break block17;
                    return;
                }
                wasEmpty = this.receivedBuffers.isEmpty();
                this.receivedBuffers.add(buffer);
                recycleBuffer = false;
                notifyReceivedBuffer = listener != null && buffer.isBuffer() && this.receivedCheckpointId < this.lastRequestedCheckpointId ? buffer.retainBuffer() : null;
                notifyReceivedBarrier = listener != null ? this.parseCheckpointBarrierOrNull(buffer) : null;
            }
            ++this.expectedSequenceNumber;
            if (wasEmpty) {
                this.notifyChannelNonEmpty();
            }
            if (backlog >= 0) {
                this.onSenderBacklog(backlog);
            }
            if (notifyReceivedBarrier != null) {
                this.receivedCheckpointId = notifyReceivedBarrier.getId();
                if (notifyReceivedBarrier.isCheckpoint()) {
                    listener.notifyBarrierReceived(notifyReceivedBarrier, this.channelInfo);
                }
            } else if (notifyReceivedBuffer != null) {
                listener.notifyBufferReceived(notifyReceivedBuffer, this.channelInfo);
            }
        }
        finally {
            if (recycleBuffer) {
                buffer.recycleBuffer();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEmptyBuffer(int sequenceNumber, int backlog) throws IOException {
        boolean success = false;
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == sequenceNumber) {
                    ++this.expectedSequenceNumber;
                    success = true;
                } else {
                    this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                }
            }
        }
        if (success && backlog >= 0) {
            this.onSenderBacklog(backlog);
        }
    }

    public void onFailedPartitionRequest() {
        this.inputGate.triggerPartitionStateCheck(this.partitionId);
    }

    public void onError(Throwable cause) {
        this.setError(cause);
    }

    private void checkPartitionRequestQueueInitialized() throws IOException {
        this.checkError();
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Bug: partitionRequestClient is not initialized before processing data and no error is detected.");
    }

    private static class BufferReorderingException
    extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
            this.expectedSequenceNumber = expectedSequenceNumber;
            this.actualSequenceNumber = actualSequenceNumber;
        }

        @Override
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", this.expectedSequenceNumber, this.actualSequenceNumber);
        }
    }
}

