package com.theokanning.openai.service.assistant_stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.theokanning.openai.OpenAiError;
import com.theokanning.openai.OpenAiHttpException;
import com.theokanning.openai.assistants.StreamEvent;
import com.theokanning.openai.assistants.message.Message;
import com.theokanning.openai.assistants.message.content.MessageDelta;
import com.theokanning.openai.assistants.run.Run;
import com.theokanning.openai.assistants.run.ToolCall;
import com.theokanning.openai.assistants.run.ToolCallFunction;
import com.theokanning.openai.assistants.run_step.RunStep;
import com.theokanning.openai.assistants.run_step.RunStepDelta;
import com.theokanning.openai.utils.JsonUtil;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/theokanning/openai/service/assistant_stream/AssistantStreamManager.class */
public class AssistantStreamManager {
    private static final Logger log = LoggerFactory.getLogger(AssistantStreamManager.class);
    private final AssistantEventHandler eventHandler;
    private final List<MessageDelta> msgDeltas;
    private final List<RunStepDelta> runStepDeltas;
    private final List<AssistantSSE> eventMsgsHolder;
    private final ObjectMapper mapper;
    private MessageDelta accumulatedMessageDelta;
    private RunStepDelta accumulatedRsd;
    private Run currentRun;
    private Message currentMessage;
    private RunStep currentRunStep;
    private volatile boolean completed;
    private final Flowable<AssistantSSE> stream;
    private Disposable disposable;

    /* renamed from: com.theokanning.openai.service.assistant_stream.AssistantStreamManager$2, reason: invalid class name */
    /* loaded from: input_file:com/theokanning/openai/service/assistant_stream/AssistantStreamManager$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$theokanning$openai$assistants$StreamEvent = new int[StreamEvent.values().length];

        static {
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_QUEUED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_IN_PROGRESS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_REQUIRES_ACTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_COMPLETED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_CANCELLING.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_CANCELLED.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_EXPIRED.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_CREATED.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_IN_PROGRESS.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_DELTA.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_COMPLETED.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_FAILED.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_CANCELLED.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_RUN_STEP_EXPIRED.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_MESSAGE_CREATED.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_MESSAGE_IN_PROGRESS.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_MESSAGE_DELTA.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_MESSAGE_COMPLETED.ordinal()] = 20;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.THREAD_MESSAGE_INCOMPLETE.ordinal()] = 21;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.DONE.ordinal()] = 22;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$com$theokanning$openai$assistants$StreamEvent[StreamEvent.ERROR.ordinal()] = 23;
            } catch (NoSuchFieldError e23) {
            }
        }
    }

    private AssistantStreamManager(Flowable<AssistantSSE> flowable, AssistantEventHandler assistantEventHandler) {
        this.mapper = JsonUtil.getInstance();
        this.eventHandler = assistantEventHandler;
        this.msgDeltas = Collections.synchronizedList(new ArrayList());
        this.runStepDeltas = Collections.synchronizedList(new ArrayList());
        this.eventMsgsHolder = Collections.synchronizedList(new ArrayList());
        this.stream = flowable;
    }

    private AssistantStreamManager(Flowable<AssistantSSE> flowable) {
        this(flowable, new AssistantEventHandler() { // from class: com.theokanning.openai.service.assistant_stream.AssistantStreamManager.1
        });
    }

    public static AssistantStreamManager start(Flowable<AssistantSSE> flowable, AssistantEventHandler assistantEventHandler) {
        AssistantStreamManager assistantStreamManager = new AssistantStreamManager(flowable, assistantEventHandler);
        assistantStreamManager.start();
        return assistantStreamManager;
    }

    public static AssistantStreamManager start(Flowable<AssistantSSE> flowable) {
        AssistantStreamManager assistantStreamManager = new AssistantStreamManager(flowable);
        assistantStreamManager.start();
        return assistantStreamManager;
    }

    public static AssistantStreamManager syncStart(Flowable<AssistantSSE> flowable, AssistantEventHandler assistantEventHandler) {
        AssistantStreamManager assistantStreamManager = new AssistantStreamManager(flowable, assistantEventHandler);
        assistantStreamManager.syncStart();
        return assistantStreamManager;
    }

    public static AssistantStreamManager syncStart(Flowable<AssistantSSE> flowable) {
        AssistantStreamManager assistantStreamManager = new AssistantStreamManager(flowable);
        assistantStreamManager.syncStart();
        return assistantStreamManager;
    }

    public Optional<MessageDelta> getAccumulatedMsg() {
        return Optional.ofNullable(this.accumulatedMessageDelta);
    }

    public Optional<RunStepDelta> getAccumulatedRsd() {
        return Optional.ofNullable(this.accumulatedRsd);
    }

    public Optional<Run> getCurrentRun() {
        return Optional.ofNullable(this.currentRun);
    }

    public Optional<Message> getCurrentMessage() {
        return Optional.ofNullable(this.currentMessage);
    }

    public Optional<RunStep> getCurrentRunStep() {
        return Optional.ofNullable(this.currentRunStep);
    }

    private void start() {
        Flowable<AssistantSSE> flowable = this.stream;
        Consumer consumer = this::handleEvent;
        AssistantEventHandler assistantEventHandler = this.eventHandler;
        assistantEventHandler.getClass();
        this.disposable = flowable.subscribe(consumer, assistantEventHandler::onError, () -> {
            this.completed = true;
        });
    }

    public void shutDown() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
    }

    private void syncStart() {
        Flowable<AssistantSSE> flowable = this.stream;
        Consumer consumer = this::handleEvent;
        AssistantEventHandler assistantEventHandler = this.eventHandler;
        assistantEventHandler.getClass();
        flowable.blockingSubscribe(consumer, assistantEventHandler::onError, () -> {
            this.completed = true;
        });
    }

    private void handleEvent(AssistantSSE assistantSSE) {
        StreamEvent event = assistantSSE.getEvent();
        this.eventMsgsHolder.add(assistantSSE);
        this.eventHandler.onEvent(assistantSSE);
        switch (AnonymousClass2.$SwitchMap$com$theokanning$openai$assistants$StreamEvent[event.ordinal()]) {
            case 1:
                updateCurrentRun(assistantSSE);
                this.eventHandler.onRunCreated(this.currentRun);
                return;
            case 2:
                updateCurrentRun(assistantSSE);
                this.eventHandler.onRunQueued(this.currentRun);
                return;
            case 3:
                updateCurrentRun(assistantSSE);
                this.eventHandler.onRunInProgress(this.currentRun);
                return;
            case 4:
                updateCurrentRun(assistantSSE);
                translationRunStepDelta();
                this.eventHandler.onRunRequiresAction(this.currentRun);
                return;
            case 5:
                updateCurrentRun(assistantSSE);
                this.eventHandler.onRunCompleted(this.currentRun);
                return;
            case 6:
                updateCurrentRun(assistantSSE);
                log.warn("run:{} failed at:{}", this.currentRun.getId(), this.currentRun.getFailedAt());
                this.eventHandler.onRunFailed(this.currentRun);
                return;
            case 7:
                updateCurrentRun(assistantSSE);
                this.eventHandler.onRunCancelling(this.currentRun);
                return;
            case 8:
                updateCurrentRun(assistantSSE);
                this.eventHandler.onRunCancelled(this.currentRun);
                return;
            case 9:
                updateCurrentRun(assistantSSE);
                log.warn("run:{} expired at:{}", this.currentRun.getId(), this.currentRun.getExpiresAt());
                this.eventHandler.onRunExpired(this.currentRun);
                return;
            case 10:
                updateCurrentRunStep(assistantSSE);
                this.eventHandler.onRunStepCreated(this.currentRunStep);
                return;
            case 11:
                updateCurrentRunStep(assistantSSE);
                this.eventHandler.onRunStepInProgress(this.currentRunStep);
                return;
            case 12:
                accumulateRunStepDeltaAndSave(assistantSSE);
                this.eventHandler.onRunStepDelta(this.runStepDeltas.get(this.runStepDeltas.size() - 1));
                return;
            case 13:
                updateCurrentRunStep(assistantSSE);
                this.eventHandler.onRunStepCompleted(this.currentRunStep);
                return;
            case 14:
                updateCurrentRunStep(assistantSSE);
                log.warn("runid:{} ,RunStepId:{} failed at:{}", new Object[]{this.currentRun.getId(), this.currentRunStep.getId(), this.currentRunStep.getFailedAt()});
                this.eventHandler.onRunStepFailed(this.currentRunStep);
                return;
            case 15:
                updateCurrentRunStep(assistantSSE);
                this.eventHandler.onRunStepCancelled(this.currentRunStep);
                return;
            case 16:
                updateCurrentRunStep(assistantSSE);
                log.warn("runid:{} ,RunStepId:{} expired at: {}", new Object[]{this.currentRun.getId(), this.currentRunStep.getId(), this.currentRunStep.getExpiredAt()});
                this.eventHandler.onRunStepExpired(this.currentRunStep);
                return;
            case 17:
                updateCurrentMessage(assistantSSE);
                this.eventHandler.onMessageCreated(this.currentMessage);
                return;
            case 18:
                updateCurrentMessage(assistantSSE);
                this.eventHandler.onMessageInProgress(this.currentMessage);
                return;
            case 19:
                accumulateMessageDeltaAndSave(assistantSSE);
                this.eventHandler.onMessageDelta(this.msgDeltas.get(this.msgDeltas.size() - 1));
                return;
            case 20:
                updateCurrentMessage(assistantSSE);
                this.eventHandler.onMessageCompleted(this.currentMessage);
                return;
            case 21:
                updateCurrentMessage(assistantSSE);
                log.warn("Message:{} incomplete", this.currentMessage.getId());
                this.eventHandler.onMessageInComplete(this.currentMessage);
                return;
            case 22:
                this.completed = true;
                this.eventHandler.onEnd();
                return;
            case 23:
                log.error("Stream error,the final message is:{},Run is {} ", this.currentMessage, this.currentRun);
                this.completed = true;
                this.eventHandler.onError(new OpenAiHttpException((OpenAiError) assistantSSE.getPojo(), (Exception) null, 200));
                return;
            default:
                return;
        }
    }

    public void waitForCompletion() {
        if (this.disposable == null || !this.disposable.isDisposed()) {
            while (!this.completed) {
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    log.error("InterruptedException", e);
                    shutDown();
                }
            }
        }
    }

    public Optional<StreamEvent> getCurrentEvent() {
        return Optional.ofNullable(this.eventMsgsHolder.isEmpty() ? null : this.eventMsgsHolder.get(this.eventMsgsHolder.size() - 1).getEvent());
    }

    public List<AssistantSSE> getEventMsgsHolder() {
        return new ArrayList(this.eventMsgsHolder);
    }

    public List<MessageDelta> getMsgDeltas() {
        return new ArrayList(this.msgDeltas);
    }

    public List<RunStepDelta> getRunStepDeltas() {
        return new ArrayList(this.runStepDeltas);
    }

    private void translationRunStepDelta() {
        Iterator it = this.accumulatedRsd.getDelta().getStepDetails().getToolCalls().iterator();
        while (it.hasNext()) {
            ToolCallFunction function = ((ToolCall) it.next()).getFunction();
            try {
                function.setArguments(this.mapper.readTree(function.getArguments().asText()));
            } catch (JsonProcessingException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    private void updateCurrentRunStep(AssistantSSE assistantSSE) {
        if (!assistantSSE.getEvent().dataClass.equals(RunStep.class)) {
            throw new IllegalArgumentException("Event data is not a RunStep,raw data is: " + assistantSSE.getData() + "event is:" + assistantSSE.getEvent().name());
        }
        this.currentRunStep = (RunStep) assistantSSE.getPojo();
    }

    private void updateCurrentRun(AssistantSSE assistantSSE) {
        if (!assistantSSE.getEvent().dataClass.equals(Run.class)) {
            throw new IllegalArgumentException("Event data is not a Run,raw data is: " + assistantSSE.getData() + "event is:" + assistantSSE.getEvent().name());
        }
        this.currentRun = (Run) assistantSSE.getPojo();
    }

    private void updateCurrentMessage(AssistantSSE assistantSSE) {
        if (!assistantSSE.getEvent().dataClass.equals(Message.class)) {
            throw new IllegalArgumentException("Event data is not a Message,raw data is: " + assistantSSE.getData() + "event is:" + assistantSSE.getEvent().name());
        }
        this.currentMessage = (Message) assistantSSE.getPojo();
    }

    private void accumulateRunStepDeltaAndSave(AssistantSSE assistantSSE) {
        if (!assistantSSE.getEvent().dataClass.equals(RunStepDelta.class)) {
            throw new IllegalArgumentException("Event data is not a RunStepDelta,raw data is: " + assistantSSE.getData() + "event is:" + assistantSSE.getEvent().name());
        }
        RunStepDelta runStepDelta = (RunStepDelta) assistantSSE.getPojo();
        this.runStepDeltas.add(runStepDelta);
        this.accumulatedRsd = DeltaUtil.accumulatRunStepDelta(this.accumulatedRsd, runStepDelta);
    }

    private void accumulateMessageDeltaAndSave(AssistantSSE assistantSSE) {
        if (!assistantSSE.getEvent().dataClass.equals(MessageDelta.class)) {
            throw new IllegalArgumentException("Event data is not a MessageDelta,raw data is: " + assistantSSE.getData() + "event is:" + assistantSSE.getEvent().name());
        }
        MessageDelta messageDelta = (MessageDelta) assistantSSE.getPojo();
        this.msgDeltas.add(messageDelta);
        this.accumulatedMessageDelta = DeltaUtil.accumulatMessageDelta(this.accumulatedMessageDelta, messageDelta);
        if (this.accumulatedMessageDelta.getDelta().getRole() == null || this.accumulatedMessageDelta.getDelta().getRole().isEmpty()) {
            getCurrentMessage().ifPresent(message -> {
                this.accumulatedMessageDelta.getDelta().setRole(message.getRole());
            });
        }
    }

    public boolean isCompleted() {
        return this.completed;
    }
}
