/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestQueue
extends ChannelInboundHandlerAdapter {
    private final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class);
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final Queue<SequenceNumberingViewReader> nonEmptyReader = new ArrayDeque<SequenceNumberingViewReader>();
    private final Set<InputChannelID> released = Sets.newHashSet();
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    PartitionRequestQueue() {
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelRegistered(ctx);
    }

    void notifyReaderNonEmpty(final SequenceNumberingViewReader reader) {
        this.ctx.executor().execute(new Runnable(){

            @Override
            public void run() {
                PartitionRequestQueue.this.ctx.pipeline().fireUserEventTriggered((Object)reader);
            }
        });
    }

    public void cancel(InputChannelID receiverId) {
        this.ctx.pipeline().fireUserEventTriggered((Object)receiverId);
    }

    public void close() {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.getClass() == SequenceNumberingViewReader.class) {
            boolean triggerWrite = this.nonEmptyReader.isEmpty();
            this.nonEmptyReader.add((SequenceNumberingViewReader)msg);
            if (triggerWrite) {
                this.writeAndFlushNextMessageIfPossible(ctx.channel());
            }
        } else if (msg.getClass() == InputChannelID.class) {
            InputChannelID toCancel = (InputChannelID)((Object)msg);
            if (this.released.contains((Object)toCancel)) {
                return;
            }
            int size = this.nonEmptyReader.size();
            for (int i = 0; i < size; ++i) {
                SequenceNumberingViewReader reader = this.nonEmptyReader.poll();
                if (reader.getReceiverId().equals((Object)toCancel)) {
                    reader.releaseAllResources();
                    this.markAsReleased(reader.getReceiverId());
                    continue;
                }
                this.nonEmptyReader.add(reader);
            }
        } else {
            ctx.fireUserEventTriggered(msg);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextMessageIfPossible(ctx.channel());
    }

    private void writeAndFlushNextMessageIfPossible(Channel channel) throws IOException {
        block8: {
            if (this.fatalError) {
                return;
            }
            InputChannel.BufferAndAvailability next = null;
            try {
                SequenceNumberingViewReader reader;
                block9: {
                    if (!channel.isWritable()) break block8;
                    while (true) {
                        if ((reader = this.nonEmptyReader.poll()) == null) {
                            return;
                        }
                        next = reader.getNextBuffer();
                        if (next != null) break block9;
                        if (!reader.isReleased()) break;
                        this.markAsReleased(reader.getReceiverId());
                        Throwable cause = reader.getFailureCause();
                        if (cause == null) continue;
                        NettyMessage.ErrorResponse msg = new NettyMessage.ErrorResponse(new ProducerFailedException(cause), reader.getReceiverId());
                        this.ctx.writeAndFlush((Object)msg);
                    }
                    IllegalStateException err = new IllegalStateException("Bug in Netty consumer logic: reader queue got notified by partition about available data, but none was available.");
                    this.handleException(this.ctx.channel(), err);
                    return;
                }
                if (next.moreAvailable()) {
                    this.nonEmptyReader.add(reader);
                }
                NettyMessage.BufferResponse msg = new NettyMessage.BufferResponse(next.buffer(), reader.getSequenceNumber(), reader.getReceiverId());
                if (this.isEndOfPartitionEvent(next.buffer())) {
                    reader.notifySubpartitionConsumed();
                    reader.releaseAllResources();
                    this.markAsReleased(reader.getReceiverId());
                }
                channel.writeAndFlush((Object)msg).addListener((GenericFutureListener)this.writeListener);
                return;
            }
            catch (Throwable t) {
                if (next != null) {
                    next.buffer().recycle();
                }
                throw new IOException(t.getMessage(), t);
            }
        }
    }

    private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
        return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, ((Object)((Object)this)).getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.releaseAllResources();
        ctx.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.handleException(ctx.channel(), cause);
    }

    private void handleException(Channel channel, Throwable cause) throws IOException {
        this.LOG.debug("Encountered error while consuming partitions", cause);
        this.fatalError = true;
        this.releaseAllResources();
        if (channel.isActive()) {
            channel.writeAndFlush((Object)new NettyMessage.ErrorResponse(cause)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        SequenceNumberingViewReader reader;
        while ((reader = this.nonEmptyReader.poll()) != null) {
            reader.releaseAllResources();
            this.markAsReleased(reader.getReceiverId());
        }
    }

    private void markAsReleased(InputChannelID receiverId) {
        this.released.add(receiverId);
    }

    private class WriteAndFlushNextMessageIfPossibleListener
    implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            try {
                if (future.isSuccess()) {
                    PartitionRequestQueue.this.writeAndFlushNextMessageIfPossible(future.channel());
                } else if (future.cause() != null) {
                    PartitionRequestQueue.this.handleException(future.channel(), future.cause());
                } else {
                    PartitionRequestQueue.this.handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
                }
            }
            catch (Throwable t) {
                PartitionRequestQueue.this.handleException(future.channel(), t);
            }
        }
    }
}

