package com.azure.ai.openai.responses.implementation;

import com.azure.ai.openai.responses.models.ResponsesStreamEvent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.logging.ClientLogger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/ai/openai/responses/implementation/OpenAIServerSentEvents.class */
public final class OpenAIServerSentEvents {
    private static final int SSE_CHUNK_LINE_BREAK_COUNT_MARKER = 2;
    private final Flux<ByteBuffer> source;
    private ByteArrayOutputStream outStream = new ByteArrayOutputStream();
    private static final ClientLogger LOGGER = new ClientLogger(OpenAIServerSentEvents.class);

    public OpenAIServerSentEvents(Flux<ByteBuffer> flux) {
        this.source = flux;
    }

    public Flux<ResponsesStreamEvent> getEvents() {
        return mapEventStream();
    }

    private Flux<ResponsesStreamEvent> mapEventStream() {
        return this.source.publishOn(Schedulers.boundedElastic()).concatMap(byteBuffer -> {
            ArrayList arrayList = new ArrayList();
            int i = 0;
            try {
                for (byte b : byteBuffer.array()) {
                    this.outStream.write(b);
                    if (isByteLineFeed(b)) {
                        i++;
                        if (i == SSE_CHUNK_LINE_BREAK_COUNT_MARKER) {
                            processCurrentEvent(arrayList);
                            this.outStream = new ByteArrayOutputStream();
                            i = 0;
                        }
                    } else if (!isByteCarriageReturn(b)) {
                        i = 0;
                    }
                }
                processRemainingBytes(arrayList);
                return Flux.fromIterable(arrayList);
            } catch (IOException e) {
                return Flux.error(LOGGER.atError().log(e));
            }
        }).cache();
    }

    private void processCurrentEvent(List<ResponsesStreamEvent> list) throws UnsupportedEncodingException {
        handleCurrentEvent(this.outStream.toString(StandardCharsets.UTF_8.name()), list);
    }

    private void processRemainingBytes(List<ResponsesStreamEvent> list) throws UnsupportedEncodingException {
        String byteArrayOutputStream = this.outStream.toString(StandardCharsets.UTF_8.name());
        if (byteArrayOutputStream.endsWith("\n\n")) {
            handleCurrentEvent(byteArrayOutputStream, list);
        }
    }

    private boolean isByteLineFeed(byte b) {
        return b == 10;
    }

    private boolean isByteCarriageReturn(byte b) {
        return b == 13;
    }

    public void handleCurrentEvent(String str, List<ResponsesStreamEvent> list) {
        if (str.isEmpty()) {
            return;
        }
        for (String str2 : str.trim().split("\n\n")) {
            if (!str2.isEmpty()) {
                String[] split = str2.split("\n", SSE_CHUNK_LINE_BREAK_COUNT_MARKER);
                if (split.length == SSE_CHUNK_LINE_BREAK_COUNT_MARKER && !split[0].isEmpty() && !split[1].isEmpty() && split[0].startsWith("event:") && split[1].startsWith("data:")) {
                    list.add((ResponsesStreamEvent) BinaryData.fromString(split[1].substring(5).trim()).toObject(ResponsesStreamEvent.class));
                }
            }
        }
    }
}
