/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.Optional;
import java.util.Random;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordWriter<T extends IOReadableWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
    private final ResultPartitionWriter targetPartition;
    private final ChannelSelector<T> channelSelector;
    private final int numberOfChannels;
    private final int[] broadcastChannels;
    private final RecordSerializer<T> serializer;
    private final Optional<BufferBuilder>[] bufferBuilders;
    private final Random rng = new XORShiftRandom();
    private Counter numBytesOut = new SimpleCounter();
    private Counter numBuffersOut = new SimpleCounter();
    private final boolean flushAlways;
    private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
    private final Optional<OutputFlusher> outputFlusher;
    private Throwable flusherException;

    public RecordWriter(ResultPartitionWriter writer) {
        this(writer, new RoundRobinChannelSelector(), -1L, null);
    }

    public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout, String taskName) {
        this.targetPartition = writer;
        this.channelSelector = channelSelector;
        this.numberOfChannels = writer.getNumberOfSubpartitions();
        this.channelSelector.setup(this.numberOfChannels);
        this.serializer = new SpanningRecordSerializer();
        this.bufferBuilders = new Optional[this.numberOfChannels];
        this.broadcastChannels = new int[this.numberOfChannels];
        for (int i = 0; i < this.numberOfChannels; ++i) {
            this.broadcastChannels[i] = i;
            this.bufferBuilders[i] = Optional.empty();
        }
        Preconditions.checkArgument((timeout >= -1L ? 1 : 0) != 0);
        boolean bl = this.flushAlways = timeout == 0L;
        if (timeout == -1L || timeout == 0L) {
            this.outputFlusher = Optional.empty();
        } else {
            String threadName = taskName == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "OutputFlusher for " + taskName;
            this.outputFlusher = Optional.of(new OutputFlusher(threadName, timeout));
            this.outputFlusher.get().start();
        }
    }

    public void emit(T record) throws IOException, InterruptedException {
        this.checkErroneous();
        this.emit(record, this.channelSelector.selectChannel(record));
    }

    public void broadcastEmit(T record) throws IOException, InterruptedException {
        this.checkErroneous();
        this.serializer.serializeRecord(record);
        boolean pruneAfterCopying = false;
        for (int channel : this.broadcastChannels) {
            if (!this.copyFromSerializerToTargetChannel(channel)) continue;
            pruneAfterCopying = true;
        }
        if (pruneAfterCopying) {
            this.serializer.prune();
        }
    }

    public void randomEmit(T record) throws IOException, InterruptedException {
        this.emit(record, this.rng.nextInt(this.numberOfChannels));
    }

    private void emit(T record, int targetChannel) throws IOException, InterruptedException {
        this.serializer.serializeRecord(record);
        if (this.copyFromSerializerToTargetChannel(targetChannel)) {
            this.serializer.prune();
        }
    }

    private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
        this.serializer.reset();
        boolean pruneTriggered = false;
        BufferBuilder bufferBuilder = this.getBufferBuilder(targetChannel);
        RecordSerializer.SerializationResult result = this.serializer.copyToBufferBuilder(bufferBuilder);
        while (result.isFullBuffer()) {
            this.numBytesOut.inc((long)bufferBuilder.finish());
            this.numBuffersOut.inc();
            if (result.isFullRecord()) {
                pruneTriggered = true;
                this.bufferBuilders[targetChannel] = Optional.empty();
                break;
            }
            bufferBuilder = this.requestNewBufferBuilder(targetChannel);
            result = this.serializer.copyToBufferBuilder(bufferBuilder);
        }
        Preconditions.checkState((!this.serializer.hasSerializedData() ? 1 : 0) != 0, (Object)"All data should be written at once");
        if (this.flushAlways) {
            this.targetPartition.flush(targetChannel);
        }
        return pruneTriggered;
    }

    public void broadcastEvent(AbstractEvent event) throws IOException {
        try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event);){
            for (int targetChannel = 0; targetChannel < this.numberOfChannels; ++targetChannel) {
                this.tryFinishCurrentBufferBuilder(targetChannel);
                this.targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
            }
            if (this.flushAlways) {
                this.flushAll();
            }
        }
    }

    public void flushAll() {
        this.targetPartition.flushAll();
    }

    public void clearBuffers() {
        for (int targetChannel = 0; targetChannel < this.numberOfChannels; ++targetChannel) {
            this.closeBufferBuilder(targetChannel);
        }
    }

    public void setMetricGroup(TaskIOMetricGroup metrics) {
        this.numBytesOut = metrics.getNumBytesOutCounter();
        this.numBuffersOut = metrics.getNumBuffersOutCounter();
    }

    private void tryFinishCurrentBufferBuilder(int targetChannel) {
        if (!this.bufferBuilders[targetChannel].isPresent()) {
            return;
        }
        BufferBuilder bufferBuilder = this.bufferBuilders[targetChannel].get();
        this.bufferBuilders[targetChannel] = Optional.empty();
        this.numBytesOut.inc((long)bufferBuilder.finish());
        this.numBuffersOut.inc();
    }

    private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
        if (this.bufferBuilders[targetChannel].isPresent()) {
            return this.bufferBuilders[targetChannel].get();
        }
        return this.requestNewBufferBuilder(targetChannel);
    }

    private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
        Preconditions.checkState((!this.bufferBuilders[targetChannel].isPresent() || this.bufferBuilders[targetChannel].get().isFinished() ? 1 : 0) != 0);
        BufferBuilder bufferBuilder = this.targetPartition.getBufferProvider().requestBufferBuilderBlocking();
        this.bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
        this.targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
        return bufferBuilder;
    }

    private void closeBufferBuilder(int targetChannel) {
        if (this.bufferBuilders[targetChannel].isPresent()) {
            this.bufferBuilders[targetChannel].get().finish();
            this.bufferBuilders[targetChannel] = Optional.empty();
        }
    }

    public void close() {
        this.clearBuffers();
        if (this.outputFlusher.isPresent()) {
            this.outputFlusher.get().terminate();
            try {
                this.outputFlusher.get().join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void notifyFlusherException(Throwable t) {
        if (this.flusherException == null) {
            LOG.error("An exception happened while flushing the outputs", t);
            this.flusherException = t;
        }
    }

    private void checkErroneous() throws IOException {
        if (this.flusherException != null) {
            throw new IOException("An exception happened while flushing the outputs", this.flusherException);
        }
    }

    public static RecordWriter createRecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector, long timeout, String taskName) {
        if (channelSelector.isBroadcast()) {
            return new BroadcastRecordWriter(writer, channelSelector, timeout, taskName);
        }
        return new RecordWriter(writer, channelSelector, timeout, taskName);
    }

    public static RecordWriter createRecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector, String taskName) {
        return RecordWriter.createRecordWriter(writer, channelSelector, -1L, taskName);
    }

    private class OutputFlusher
    extends Thread {
        private final long timeout;
        private volatile boolean running;

        OutputFlusher(String name, long timeout) {
            super(name);
            this.running = true;
            this.setDaemon(true);
            this.timeout = timeout;
        }

        public void terminate() {
            this.running = false;
            this.interrupt();
        }

        @Override
        public void run() {
            try {
                while (this.running) {
                    block5: {
                        try {
                            Thread.sleep(this.timeout);
                        }
                        catch (InterruptedException e) {
                            if (!this.running) break block5;
                            throw new Exception(e);
                        }
                    }
                    RecordWriter.this.flushAll();
                }
            }
            catch (Throwable t) {
                RecordWriter.this.notifyFlusherException(t);
            }
        }
    }
}

