在我的Nuxt应用中,我有一个Pinia store,并通过Nitro服务器进行API调用,获取流数据作为响应。当我尝试使用流数据更新Pinia store时,它没有更新,即我可以调用
const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
console.log("setStreamOutput", output)
streamOutput.value = output
console.log("setStreamOutput", output)
}
我会在控制台看到日志,但这些日志出现在服务器日志中,而不是浏览器中,并且在我的模板和浏览器工具中,我仍然看到streamOutput的初始值。
export default defineEventHandler(async (event) => {
const config = useRuntimeConfig()
const body = await readBody(event)
const chatStore = useChatStore()
const readable = new Readable({
read() {} // No-op implementation for read method
})
const run = sendStream(
event,
await openai.beta.threads
.createAndRunStream({
assistant_id: *ID*,
thread: {
messages: [
{ role: "user", content: "用五岁小孩能理解的方式解释深度学习。" }
]
}
})
.on("messageCreated", async (text) => {
console.log("\n\n messageCreated", text)
})
.on("textDelta", async (textDelta, snapshot) => {
console.log("\n\n textDelta", snapshot)
await chatStore.setStreamOutput(snapshot)
})
.on("toolCallCreated", async (toolCall) => {
console.log("\n\n toolCallCreated", toolCall)
})
.on("toolCallDelta", async (toolCallDelta, snapshot) => {
console.log("\n\n toolCallDelta", snapshot)
if (toolCallDelta.type === "code_interpreter") {
if (toolCallDelta.code_interpreter.input) {
chatStore.setStreamOutput(
chatStore.streamOutput + toolCallDelta.code_interpreter.input
)
}
if (toolCallDelta.code_interpreter.outputs) {
chatStore.setStreamOutput(chatStore.streamOutput + "\noutput >\n")
toolCallDelta.code_interpreter.outputs.forEach((output) => {
if (output.type === "logs") {
chatStore.setStreamOutput(
chatStore.streamOutput + `\n${output.logs}\n`
)
}
})
}
}
})
.on("textDone", async (content, snapshot) => {
console.log("\n\n text Done")
})
)
return sendStream(event, readable)
})
回答:
对于遇到此问题的人,解决方案有两个部分:
- 创建并返回一个Readable
- 更新你的Pinia store以读取数据流
关于创建Readable
import { Readable } from "stream"
import { sendStream } from "h3"
const readable = new Readable({
read() {} // No-op implementation for read method
})
await openai.beta.threads
.createAndRunStream({
...
})
.on("messageCreated", async (text) => {
readable.push(`{"messageCreated": ${JSON.stringify(text)}}\n`)
})
//类似地处理其他事件...
return sendStream(event, readable)
})
关于更新你的Pinia store以读取数据流(这有点繁琐,我相信可以优化,但我很快就会忘记这个答案,所以我觉得应该在忘记之前发布)
const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
streamOutput.value = output
}
const createRun = async () => {
try {
const response = await fetch("/api/run/create", {
method: "POST",
body: JSON.stringify({
threadId: thread.value.id
})
})
if (!response.ok) {
throw new Error("网络响应不正常")
}
const reader = response.body.getReader()
const decoder = new TextDecoder("utf-8")
let buffer = ""
let done = false
while (!done) {
const { value, done: readerDone } = await reader.read()
done = readerDone
if (value) {
buffer += decoder.decode(value, { stream: true })
// 处理缓冲区中的完整JSON对象
let boundary
while ((boundary = buffer.indexOf("\n")) !== -1) {
const chunk = buffer.slice(0, boundary).trim()
buffer = buffer.slice(boundary + 1)
if (chunk) {
try {
const json = JSON.parse(chunk)
if (json.textDelta) {
// VVVVV 在这里你可以连接到你的store的函数来使用数据
setStreamOutput(json.textDelta.value)
} else if (json.messageCreated) {
... // 处理其他流事件
}
} catch (e) {
console.error("JSON解析错误:", e)
}
}
}
}
}
} catch (error) {
console.error("发生错误:", error)
}
}