如何将Nitro API的流数据传递到Pinia Store,以便在Nuxt组件中使用?

在我的Nuxt应用中,我有一个Pinia store,并通过Nitro服务器进行API调用,获取流数据作为响应。当我尝试使用流数据更新Pinia store时,它没有更新,即我可以调用

const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
    console.log("setStreamOutput", output)
    streamOutput.value = output
    console.log("setStreamOutput", output)
}

我会在控制台看到日志,但这些日志出现在服务器日志中,而不是浏览器中,并且在我的模板和浏览器工具中,我仍然看到streamOutput的初始值。

export default defineEventHandler(async (event) => {
    const config = useRuntimeConfig()
    const body = await readBody(event)
    const chatStore = useChatStore()
    const readable = new Readable({
        read() {} // No-op implementation for read method
    })
    const run = sendStream(
        event,
        await openai.beta.threads
            .createAndRunStream({
                assistant_id: *ID*,
                thread: {
                    messages: [
                        { role: "user", content: "用五岁小孩能理解的方式解释深度学习。" }
                    ]
                }
            })
            .on("messageCreated", async (text) => {
                console.log("\n\n messageCreated", text)
            })
            .on("textDelta", async (textDelta, snapshot) => {
                console.log("\n\n textDelta", snapshot)
                await chatStore.setStreamOutput(snapshot)
            })
            .on("toolCallCreated", async (toolCall) => {
                console.log("\n\n toolCallCreated", toolCall)
            })
            .on("toolCallDelta", async (toolCallDelta, snapshot) => {
                console.log("\n\n toolCallDelta", snapshot)
                if (toolCallDelta.type === "code_interpreter") {
                    if (toolCallDelta.code_interpreter.input) {
                        chatStore.setStreamOutput(
                            chatStore.streamOutput + toolCallDelta.code_interpreter.input
                        )
                    }
                    if (toolCallDelta.code_interpreter.outputs) {
                        chatStore.setStreamOutput(chatStore.streamOutput + "\noutput >\n")
                        toolCallDelta.code_interpreter.outputs.forEach((output) => {
                            if (output.type === "logs") {
                                chatStore.setStreamOutput(
                                    chatStore.streamOutput + `\n${output.logs}\n`
                                )
                            }
                        })
                    }
                }
            })
            .on("textDone", async (content, snapshot) => {
                console.log("\n\n text Done")
            })
    )
    return sendStream(event, readable)
})

回答:

对于遇到此问题的人,解决方案有两个部分:

  1. 创建并返回一个Readable
  2. 更新你的Pinia store以读取数据流

关于创建Readable

import { Readable } from "stream"
import { sendStream } from "h3"
const readable = new Readable({
        read() {} // No-op implementation for read method
    })
await openai.beta.threads
        .createAndRunStream({
            ...
        })
        .on("messageCreated", async (text) => {
            readable.push(`{"messageCreated": ${JSON.stringify(text)}}\n`)
        })
//类似地处理其他事件...
return sendStream(event, readable)
})

关于更新你的Pinia store以读取数据流(这有点繁琐,我相信可以优化,但我很快就会忘记这个答案,所以我觉得应该在忘记之前发布)

const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
    streamOutput.value = output
}
const createRun = async () => {
    try {
        const response = await fetch("/api/run/create", {
            method: "POST",
            body: JSON.stringify({
                threadId: thread.value.id
            })
        })
        if (!response.ok) {
            throw new Error("网络响应不正常")
        }
        const reader = response.body.getReader()
        const decoder = new TextDecoder("utf-8")
        let buffer = ""
        let done = false
        while (!done) {
            const { value, done: readerDone } = await reader.read()
            done = readerDone
            if (value) {
                buffer += decoder.decode(value, { stream: true })
                // 处理缓冲区中的完整JSON对象
                let boundary
                while ((boundary = buffer.indexOf("\n")) !== -1) {
                    const chunk = buffer.slice(0, boundary).trim()
                    buffer = buffer.slice(boundary + 1)
                    if (chunk) {
                        try {
                            const json = JSON.parse(chunk)
                            if (json.textDelta) {
                                // VVVVV 在这里你可以连接到你的store的函数来使用数据
                                setStreamOutput(json.textDelta.value)
                            } else if (json.messageCreated) {
                                ... // 处理其他流事件
                            }
                        } catch (e) {
                            console.error("JSON解析错误:", e)
                        }
                    }
                }
            }
        }
    } catch (error) {
        console.error("发生错误:", error)
    }
}

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注