如何使用FastAPI在Python中转发OpenAI的流式响应?

这是我从OpenAI模型中获取基于事件的流式响应的代码(我只展示了核心部分)

client = OpenAI(api_key=OPEN_AI_API_KEY)class EventHandler(AssistantEventHandler):    def on_text_delta(self, delta: TextDelta, snapshot: Text):        print(delta.value)with client.beta.threads.runs.stream(    thread_id=thread_id,    assistant_id=assistant_id,    event_handler=EventHandler()) as stream:stream.until_done()

当API接收到令牌时,on_text_delta事件会被触发。我想使用FastAPI转发这个响应,而不是在输出屏幕上打印。

@app.get("/stream")async def stream():    return ...something...

我尝试过将结果作为HTTP正文的一部分进行响应:

from fastapi.responses import StreamingResponse...@app.post("/stream")async def stream():    with client.beta.threads.runs.stream(        thread_id=thread_id,        assistant_id=assistant_id,        event_handler=EventHandler()    ) as stream:        stream.until_done()    return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")

我在EventHandler类中创建了generator_function,但问题是直到流结束,执行都不会到达返回语句。

我也尝试过使用websockets,但问题仍然是我的程序执行应该如何流动。在API响应完成之前,流不会让执行继续进行。


回答:

我找到了解决方案!

from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom openai import OpenAI, AsyncOpenAIOPEN_AI_API_KEY = 'you_api_key'async_client = AsyncOpenAI(api_key=OPEN_AI_API_KEY)client = OpenAI(api_key=OPEN_AI_API_KEY)app = FastAPI()async def stream_assistant_response(assistant_id, thread_id):    stream =  async_client.beta.threads.runs.stream(        assistant_id=assistant_id,        thread_id=thread_id    )    async with stream as stream:        async for text in stream.text_deltas:            yield f"data: {text}\n\n"@app.get("/message")async def add_message(assistant_id, thread_id, message):    # 确保线程存在    client.beta.threads.messages.create(        thread_id=thread_id,        role="user",        content=message    )    return StreamingResponse(stream_assistant_response(assistant_id, thread_id), media_type="text/event-stream")

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注