如何使用FastAPI在Python中转发OpenAI的流式响应?

这是我从OpenAI模型中获取基于事件的流式响应的代码(我只展示了核心部分)

client = OpenAI(api_key=OPEN_AI_API_KEY)class EventHandler(AssistantEventHandler):    def on_text_delta(self, delta: TextDelta, snapshot: Text):        print(delta.value)with client.beta.threads.runs.stream(    thread_id=thread_id,    assistant_id=assistant_id,    event_handler=EventHandler()) as stream:stream.until_done()

当API接收到令牌时,on_text_delta事件会被触发。我想使用FastAPI转发这个响应,而不是在输出屏幕上打印。

@app.get("/stream")async def stream():    return ...something...

我尝试过将结果作为HTTP正文的一部分进行响应:

from fastapi.responses import StreamingResponse...@app.post("/stream")async def stream():    with client.beta.threads.runs.stream(        thread_id=thread_id,        assistant_id=assistant_id,        event_handler=EventHandler()    ) as stream:        stream.until_done()    return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")

我在EventHandler类中创建了generator_function,但问题是直到流结束,执行都不会到达返回语句。

我也尝试过使用websockets,但问题仍然是我的程序执行应该如何流动。在API响应完成之前,流不会让执行继续进行。


回答:

我找到了解决方案!

from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom openai import OpenAI, AsyncOpenAIOPEN_AI_API_KEY = 'you_api_key'async_client = AsyncOpenAI(api_key=OPEN_AI_API_KEY)client = OpenAI(api_key=OPEN_AI_API_KEY)app = FastAPI()async def stream_assistant_response(assistant_id, thread_id):    stream =  async_client.beta.threads.runs.stream(        assistant_id=assistant_id,        thread_id=thread_id    )    async with stream as stream:        async for text in stream.text_deltas:            yield f"data: {text}\n\n"@app.get("/message")async def add_message(assistant_id, thread_id, message):    # 确保线程存在    client.beta.threads.messages.create(        thread_id=thread_id,        role="user",        content=message    )    return StreamingResponse(stream_assistant_response(assistant_id, thread_id), media_type="text/event-stream")

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注