/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.util.Preconditions;

@Internal
public class StreamMultipleInputProcessorFactory {
    public static StreamMultipleInputProcessor create(AbstractInvokable ownerTask, CheckpointedInputGate[] checkpointedInputGates, StreamConfig.InputConfig[] configuredInputs, IOManager ioManager, MemoryManager memoryManager, TaskIOMetricGroup ioMetricGroup, Counter mainOperatorRecordsIn, StreamStatusMaintainer streamStatusMaintainer, MultipleInputStreamOperator<?> mainOperator, WatermarkGauge[] inputWatermarkGauges, StreamConfig streamConfig, Configuration taskManagerConfig, Configuration jobConfig, ExecutionConfig executionConfig, ClassLoader userClassloader, OperatorChain<?, ?> operatorChain) {
        Preconditions.checkNotNull(operatorChain);
        List<Input> operatorInputs = mainOperator.getInputs();
        int inputsCount = operatorInputs.size();
        StreamOneInputProcessor[] inputProcessors = new StreamOneInputProcessor[inputsCount];
        SimpleCounter networkRecordsIn = new SimpleCounter();
        ioMetricGroup.reuseRecordsInputCounter((Counter)networkRecordsIn);
        MultiStreamStreamStatusTracker streamStatusTracker = new MultiStreamStreamStatusTracker(inputsCount);
        Preconditions.checkState((configuredInputs.length == inputsCount ? 1 : 0) != 0, (String)"Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]", (Object[])new Object[]{configuredInputs.length, inputsCount});
        StreamTaskInput[] inputs = new StreamTaskInput[inputsCount];
        for (int i = 0; i < inputsCount; ++i) {
            StreamConfig.InputConfig configuredInput = configuredInputs[i];
            if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
                StreamConfig.NetworkInputConfig networkInput = (StreamConfig.NetworkInputConfig)configuredInput;
                inputs[i] = new StreamTaskNetworkInput(checkpointedInputGates[networkInput.getInputGateIndex()], networkInput.getTypeSerializer(), ioManager, new StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()), i);
                continue;
            }
            if (configuredInput instanceof StreamConfig.SourceInputConfig) {
                StreamConfig.SourceInputConfig sourceInput = (StreamConfig.SourceInputConfig)configuredInput;
                inputs[i] = operatorChain.getSourceTaskInput(sourceInput);
                continue;
            }
            throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
        }
        InputSelectable inputSelectable = mainOperator instanceof InputSelectable ? (InputSelectable)((Object)mainOperator) : null;
        StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
        boolean anyRequiresSorting = Arrays.stream(inputConfigs).anyMatch(StreamConfig::requiresSorting);
        if (anyRequiresSorting) {
            if (inputSelectable != null) {
                throw new IllegalStateException("The InputSelectable interface is not supported with sorting inputs");
            }
            StreamTaskInput[] sortingInputs = (StreamTaskInput[])IntStream.range(0, inputsCount).filter(idx -> StreamConfig.requiresSorting(inputConfigs[idx])).mapToObj(idx -> inputs[idx]).toArray(StreamTaskInput[]::new);
            KeySelector[] sortingInputKeySelectors = (KeySelector[])IntStream.range(0, inputsCount).filter(idx -> StreamConfig.requiresSorting(inputConfigs[idx])).mapToObj(idx -> streamConfig.getStatePartitioner(idx, userClassloader)).toArray(KeySelector[]::new);
            TypeSerializer[] sortingInputKeySerializers = (TypeSerializer[])IntStream.range(0, inputsCount).filter(idx -> StreamConfig.requiresSorting(inputConfigs[idx])).mapToObj(idx -> streamConfig.getTypeSerializerIn(idx, userClassloader)).toArray(TypeSerializer[]::new);
            StreamTaskInput[] passThroughInputs = (StreamTaskInput[])IntStream.range(0, inputsCount).filter(idx -> !StreamConfig.requiresSorting(inputConfigs[idx])).mapToObj(idx -> inputs[idx]).toArray(StreamTaskInput[]::new);
            MultiInputSortingDataInput.SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs(ownerTask, sortingInputs, sortingInputKeySelectors, sortingInputKeySerializers, streamConfig.getStateKeySerializer(userClassloader), passThroughInputs, memoryManager, ioManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.BATCH_OP, taskManagerConfig, userClassloader), jobConfig);
            StreamTaskInput<?>[] sortedInputs = selectableSortingInputs.getSortedInputs();
            StreamTaskInput<?>[] passedThroughInputs = selectableSortingInputs.getPassThroughInputs();
            int sortedIndex = 0;
            int passThroughIndex = 0;
            for (int i = 0; i < inputs.length; ++i) {
                if (StreamConfig.requiresSorting(inputConfigs[i])) {
                    inputs[i] = sortedInputs[sortedIndex];
                    ++sortedIndex;
                    continue;
                }
                inputs[i] = passedThroughInputs[passThroughIndex];
                ++passThroughIndex;
            }
            inputSelectable = selectableSortingInputs.getInputSelectable();
        }
        for (int i = 0; i < inputsCount; ++i) {
            StreamConfig.InputConfig configuredInput = configuredInputs[i];
            if (configuredInput instanceof StreamConfig.NetworkInputConfig) {
                StreamTaskNetworkOutput dataOutput = new StreamTaskNetworkOutput(operatorInputs.get(i), streamStatusMaintainer, inputWatermarkGauges[i], streamStatusTracker, i, mainOperatorRecordsIn, (Counter)networkRecordsIn);
                inputProcessors[i] = new StreamOneInputProcessor(inputs[i], dataOutput, operatorChain);
                continue;
            }
            if (configuredInput instanceof StreamConfig.SourceInputConfig) {
                StreamConfig.SourceInputConfig sourceInput = (StreamConfig.SourceInputConfig)configuredInput;
                Output<StreamRecord<?>> chainedSourceOutput = operatorChain.getChainedSourceOutput(sourceInput);
                inputProcessors[i] = new StreamOneInputProcessor(inputs[i], new StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer, inputWatermarkGauges[i], streamStatusTracker, i), operatorChain);
                continue;
            }
            throw new UnsupportedOperationException("Unknown input type: " + configuredInput);
        }
        return new StreamMultipleInputProcessor(new MultipleInputSelectionHandler(inputSelectable, inputsCount), inputProcessors);
    }

    private static class StreamTaskSourceOutput
    extends SourceOperatorStreamTask.AsyncDataOutputToOutput {
        private final int inputIndex;
        private final MultiStreamStreamStatusTracker streamStatusTracker;

        public StreamTaskSourceOutput(Output<StreamRecord<?>> chainedSourceOutput, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge inputWatermarkGauge, MultiStreamStreamStatusTracker streamStatusTracker, int inputIndex) {
            super(chainedSourceOutput, streamStatusMaintainer, (Counter)new SimpleCounter(), inputWatermarkGauge);
            this.streamStatusTracker = streamStatusTracker;
            this.inputIndex = inputIndex;
        }

        @Override
        public void emitStreamStatus(StreamStatus streamStatus) {
            this.streamStatusTracker.setStreamStatus(this.inputIndex, streamStatus);
            if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                if (streamStatus.isActive()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                } else if (this.streamStatusTracker.allStreamStatusesAreIdle()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                }
            }
        }
    }

    private static class StreamTaskNetworkOutput<T>
    extends AbstractDataOutput<T> {
        private final Input<T> input;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;
        private final MultiStreamStreamStatusTracker streamStatusTracker;
        private final Counter mainOperatorRecordsIn;
        private final Counter networkRecordsIn;

        private StreamTaskNetworkOutput(Input<T> input, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge inputWatermarkGauge, MultiStreamStreamStatusTracker streamStatusTracker, int inputIndex, Counter mainOperatorRecordsIn, Counter networkRecordsIn) {
            super(streamStatusMaintainer);
            this.input = (Input)Preconditions.checkNotNull(input);
            this.inputWatermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)inputWatermarkGauge);
            this.streamStatusTracker = streamStatusTracker;
            this.inputIndex = inputIndex;
            this.mainOperatorRecordsIn = mainOperatorRecordsIn;
            this.networkRecordsIn = networkRecordsIn;
        }

        @Override
        public void emitRecord(StreamRecord<T> record) throws Exception {
            this.input.setKeyContextElement(record);
            this.input.processElement(record);
            this.mainOperatorRecordsIn.inc();
            this.networkRecordsIn.inc();
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.input.processWatermark(watermark);
        }

        @Override
        public void emitStreamStatus(StreamStatus streamStatus) {
            this.streamStatusTracker.setStreamStatus(this.inputIndex, streamStatus);
            if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                if (streamStatus.isActive()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                } else if (this.streamStatusTracker.allStreamStatusesAreIdle()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                }
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.input.processLatencyMarker(latencyMarker);
        }
    }

    private static class MultiStreamStreamStatusTracker {
        private final StreamStatus[] streamStatuses;

        private MultiStreamStreamStatusTracker(int numberOfInputs) {
            this.streamStatuses = new StreamStatus[numberOfInputs];
            Arrays.fill(this.streamStatuses, StreamStatus.ACTIVE);
        }

        public void setStreamStatus(int index, StreamStatus streamStatus) {
            this.streamStatuses[index] = streamStatus;
        }

        public StreamStatus getStreamStatus(int index) {
            return this.streamStatuses[index];
        }

        public boolean allStreamStatusesAreIdle() {
            for (StreamStatus streamStatus : this.streamStatuses) {
                if (!streamStatus.isActive()) continue;
                return false;
            }
            return true;
        }
    }
}

