这是我从OpenAI模型中获取基于事件的流式响应的代码(我只展示了核心部分)
client = OpenAI(api_key=OPEN_AI_API_KEY)class EventHandler(AssistantEventHandler): def on_text_delta(self, delta: TextDelta, snapshot: Text): print(delta.value)with client.beta.threads.runs.stream( thread_id=thread_id, assistant_id=assistant_id, event_handler=EventHandler()) as stream:stream.until_done()
当API接收到令牌时,on_text_delta事件会被触发。我想使用FastAPI转发这个响应,而不是在输出屏幕上打印。
@app.get("/stream")async def stream(): return ...something...
我尝试过将结果作为HTTP正文的一部分进行响应:
from fastapi.responses import StreamingResponse...@app.post("/stream")async def stream(): with client.beta.threads.runs.stream( thread_id=thread_id, assistant_id=assistant_id, event_handler=EventHandler() ) as stream: stream.until_done() return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")
我在EventHandler
类中创建了generator_function
,但问题是直到流结束,执行都不会到达返回语句。
我也尝试过使用websockets,但问题仍然是我的程序执行应该如何流动。在API响应完成之前,流不会让执行继续进行。
回答:
我找到了解决方案!
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom openai import OpenAI, AsyncOpenAIOPEN_AI_API_KEY = 'you_api_key'async_client = AsyncOpenAI(api_key=OPEN_AI_API_KEY)client = OpenAI(api_key=OPEN_AI_API_KEY)app = FastAPI()async def stream_assistant_response(assistant_id, thread_id): stream = async_client.beta.threads.runs.stream( assistant_id=assistant_id, thread_id=thread_id ) async with stream as stream: async for text in stream.text_deltas: yield f"data: {text}\n\n"@app.get("/message")async def add_message(assistant_id, thread_id, message): # 确保线程存在 client.beta.threads.messages.create( thread_id=thread_id, role="user", content=message ) return StreamingResponse(stream_assistant_response(assistant_id, thread_id), media_type="text/event-stream")