/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

@Internal
public final class StreamMultipleInputProcessor
implements StreamInputProcessor {
    private final MultipleInputSelectionHandler inputSelectionHandler;
    private final InputProcessor<?>[] inputProcessors;
    private final OperatorChain<?, ?> operatorChain;
    private final StreamStatus[] streamStatuses;
    private final Counter numRecordsIn;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    public StreamMultipleInputProcessor(CheckpointedInputGate[] checkpointedInputGates, TypeSerializer<?>[] inputSerializers, IOManager ioManager, StreamStatusMaintainer streamStatusMaintainer, MultipleInputStreamOperator<?> streamOperator, MultipleInputSelectionHandler inputSelectionHandler, WatermarkGauge[] inputWatermarkGauges, OperatorChain<?, ?> operatorChain, Counter numRecordsIn) {
        this.inputSelectionHandler = (MultipleInputSelectionHandler)Preconditions.checkNotNull((Object)inputSelectionHandler);
        List<Input> inputs = streamOperator.getInputs();
        int inputsCount = inputs.size();
        this.inputProcessors = new InputProcessor[inputsCount];
        this.streamStatuses = new StreamStatus[inputsCount];
        this.numRecordsIn = numRecordsIn;
        for (int i = 0; i < inputsCount; ++i) {
            this.streamStatuses[i] = StreamStatus.ACTIVE;
            StreamTaskNetworkOutput dataOutput = new StreamTaskNetworkOutput(inputs.get(i), streamStatusMaintainer, inputWatermarkGauges[i], i);
            this.inputProcessors[i] = new InputProcessor(dataOutput, new StreamTaskNetworkInput(checkpointedInputGates[i], inputSerializers[i], ioManager, new StatusWatermarkValve(checkpointedInputGates[i].getNumberOfInputChannels(), dataOutput), i));
        }
        this.operatorChain = (OperatorChain)Preconditions.checkNotNull(operatorChain);
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.isAnyInputAvailable() || this.inputSelectionHandler.areAllInputsFinished()) {
            return AVAILABLE;
        }
        CompletableFuture anyInputAvailable = new CompletableFuture();
        for (int i = 0; i < this.inputProcessors.length; ++i) {
            if (this.inputSelectionHandler.isInputFinished(i) || !this.inputSelectionHandler.isInputSelected(i)) continue;
            ((InputProcessor)this.inputProcessors[i]).networkInput.getAvailableFuture().thenRun(() -> anyInputAvailable.complete(null));
        }
        return anyInputAvailable;
    }

    @Override
    public InputStatus processInput() throws Exception {
        int readingInputIndex = this.isPrepared ? this.selectNextReadingInputIndex() : this.selectFirstReadingInputIndex();
        if (readingInputIndex == -1) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.lastReadInputIndex = readingInputIndex;
        InputStatus inputStatus = this.inputProcessors[readingInputIndex].processInput();
        this.checkFinished(inputStatus, readingInputIndex);
        return this.inputSelectionHandler.updateStatus(inputStatus, readingInputIndex);
    }

    private int selectFirstReadingInputIndex() {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return this.selectNextReadingInputIndex();
    }

    private void checkFinished(InputStatus status, int inputIndex) throws Exception {
        if (status == InputStatus.END_OF_INPUT) {
            this.operatorChain.endHeadOperatorInput(this.getInputId(inputIndex));
            this.inputSelectionHandler.nextSelection();
        }
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        for (InputProcessor<?> input : this.inputProcessors) {
            try {
                input.close();
            }
            catch (IOException e) {
                ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
            }
        }
        if (ex != null) {
            throw ex;
        }
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
        CompletableFuture[] inputFutures = new CompletableFuture[this.inputProcessors.length];
        for (int index = 0; index < inputFutures.length; ++index) {
            inputFutures[index] = this.inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId);
        }
        return CompletableFuture.allOf(inputFutures);
    }

    private int selectNextReadingInputIndex() {
        int readingInputIndex;
        if (!this.inputSelectionHandler.isAnyInputAvailable()) {
            this.fullCheckAndSetAvailable();
        }
        if ((readingInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex)) == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            this.fullCheckAndSetAvailable();
        }
        return readingInputIndex;
    }

    private void fullCheckAndSetAvailable() {
        for (int i = 0; i < this.inputProcessors.length; ++i) {
            InputProcessor<?> inputProcessor = this.inputProcessors[i];
            if (!((InputProcessor)inputProcessor).networkInput.isApproximatelyAvailable() && !((InputProcessor)inputProcessor).networkInput.isAvailable()) continue;
            this.inputSelectionHandler.setAvailableInput(i);
        }
    }

    private int getInputId(int inputIndex) {
        return inputIndex + 1;
    }

    private boolean allStreamStatusesAreIdle() {
        for (StreamStatus streamStatus : this.streamStatuses) {
            if (!streamStatus.isActive()) continue;
            return false;
        }
        return true;
    }

    private class StreamTaskNetworkOutput<T>
    extends AbstractDataOutput<T> {
        private final Input<T> input;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;

        private StreamTaskNetworkOutput(Input<T> input, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge inputWatermarkGauge, int inputIndex) {
            super(streamStatusMaintainer);
            this.input = (Input)Preconditions.checkNotNull(input);
            this.inputWatermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)inputWatermarkGauge);
            this.inputIndex = inputIndex;
        }

        @Override
        public void emitRecord(StreamRecord<T> record) throws Exception {
            this.input.setKeyContextElement(record);
            this.input.processElement(record);
            StreamMultipleInputProcessor.this.numRecordsIn.inc();
            StreamMultipleInputProcessor.this.inputSelectionHandler.nextSelection();
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.input.processWatermark(watermark);
        }

        @Override
        public void emitStreamStatus(StreamStatus streamStatus) {
            ((StreamMultipleInputProcessor)StreamMultipleInputProcessor.this).streamStatuses[this.inputIndex] = streamStatus;
            if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                if (streamStatus.isActive()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                } else if (StreamMultipleInputProcessor.this.allStreamStatusesAreIdle()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                }
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.input.processLatencyMarker(latencyMarker);
        }
    }

    private class InputProcessor<T>
    implements Closeable {
        private final StreamTaskNetworkOutput<T> dataOutput;
        private final StreamTaskNetworkInput<T> networkInput;

        public InputProcessor(StreamTaskNetworkOutput<T> dataOutput, StreamTaskNetworkInput<T> networkInput) {
            this.dataOutput = dataOutput;
            this.networkInput = networkInput;
        }

        public InputStatus processInput() throws Exception {
            return this.networkInput.emitNext(this.dataOutput);
        }

        @Override
        public void close() throws IOException {
            this.networkInput.close();
        }

        public CompletableFuture<?> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
            return this.networkInput.prepareSnapshot(channelStateWriter, checkpointId);
        }
    }
}

