OpenAI Assistants API: 等待助手响应是否有更好的方法,以及如何增量显示助手的回答?

根据可用的信息(遗憾的是,ChatGPT在这方面帮助不大),我创建了以下代码,使我能够与OpenAI Assistants API进行交互。

然而,我仍然不喜欢_wait_for_run_completion方法和while循环。是否有更好的处理方式?

import osimport openaifrom dotenv import load_dotenvimport timeclass OpenAIChatAssistant:    def __init__(self, assistant_id, model="gpt-4o"):        self.assistant_id = assistant_id        self.model = model        if self.model != "just_copy":            load_dotenv()            openai.api_key = os.environ.get("OPENAI_API_KEY")            self.client = openai.OpenAI()            self._create_new_thread()        print('new instance started')    def _create_new_thread(self):        self.thread = self.client.beta.threads.create()        self.thread_id = self.thread.id        print(self.thread_id)    def reset_thread(self):        if self.model != "just_copy":            self._create_new_thread()    def set_model(self, model_name):        self.model = model_name        if self.model != "just_copy" and not hasattr(self, 'client'):            load_dotenv()            openai.api_key = os.environ.get("OPENAI_API_KEY")            self.client = openai.OpenAI()            self._create_new_thread()    def send_message(self, message):        if self.model == "just_copy":            return message        self.client.beta.threads.messages.create(            thread_id=self.thread_id, role="user", content=message        )        run = self.client.beta.threads.runs.create(            thread_id=self.thread_id,            assistant_id=self.assistant_id,            model=self.model        )        return self._wait_for_run_completion(run.id)    def _wait_for_run_completion(self, run_id, sleep_interval=1):        counter = 1        while True:            try:                run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)                if run.completed_at:                    messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)                    last_message = messages.data[0]                    response = last_message.content[0].text.value                    print(f'hello {counter}')                    return response            except Exception as e:                raise RuntimeError(f"An error occurred while retrieving answer: {e}")            counter += 1            time.sleep(sleep_interval)

该类可以在控制台应用程序中这样使用:

import osfrom openai_chat_assistant import OpenAIChatAssistantdef main():    assistant_id = "asst_..."    chat_assistant = OpenAIChatAssistant(assistant_id)    while True:        question = input("请输入您的问题(输入'exit'退出,'clean'重置):")        if question.lower() == 'exit':            break        elif question.lower() == 'clean':            os.system('cls' if os.name == 'nt' else 'clear')            chat_assistant.reset_thread()            print("控制台已清除,线程已重置。")        else:            response = chat_assistant.send_message(question)            print(f"助手回答:{response}")if __name__ == "__main__":    main()

当然,需要assistant_id。我将其设置在.env文件中,与API密钥相同:

OPENAI_API_KEY=sk-proj-...

回答:

关于_wait_for_run_completion方法

我从2022年12月开始使用OpenAI API,并且每周都会使用。据我所知,获取助手响应的最佳方法就是简单地检查运行状态,当运行状态变为completed时,提取助手的响应。

PS:如果你在比较ChatGPT的延迟和你的代码,请不要这样做。ChatGPT的免费版本使用的是GPT-4o(有限制)和GPT-3.5。ChatGPT不使用Assistants API。

关于“增量显示回答”

你提到的是响应流,这在Assistants API中是可能的。OpenAI Python SDK已经为Assistants API实现了create and stream帮助器。这些帮助器允许你订阅你感兴趣的事件类型。你要找的事件是on_text_delta。你需要订阅on_text_delta才能流式传输助手的响应。

我之前开发了一个带有响应流的助手终端用户界面(即客户支持聊天机器人)(请参见YouTube上的教程和GitHub上的代码)。

基本上,有两个步骤。

步骤1:定义用于流事件的事件处理程序类,并订阅on_text_delta事件

from openai import AssistantEventHandlerclass MyEventHandler(AssistantEventHandler): # 👈 定义类    def on_text_delta(self, delta, snapshot): # 👈 订阅事件        print(delta.value, end = "")    def on_error(error):        print(error)

步骤2:将类传递给event_handler参数

with client.beta.threads.runs.create_and_stream(        thread_id = my_thread.id,        assistant_id = assistant_id,        event_handler = MyEventHandler(), # 👈 传递类    ) as stream:        print("\nAssistant:")        stream.until_done()

完整代码

import osfrom dotenv import load_dotenvfrom openai import OpenAI, AssistantEventHandlerfrom rich.console import Console# 从.env文件加载环境变量load_dotenv()# 设置OpenAI客户端client = OpenAI()OpenAI.api_key = os.getenv('OPENAI_API_KEY')# 设置rich控制台以格式化输出console = Console()# 从环境变量中获取文件和助手IDfile_id = os.getenv('OPENAI_FILE_ID')assistant_id = os.getenv('OPENAI_ASSISTANT_ID')# 定义用于流事件的事件处理程序类class MyEventHandler(AssistantEventHandler):    def on_text_delta(self, delta, snapshot):        console.print(delta.value, end = "", style = "black on white")    def on_error(error):        print(error)# 创建一个新线程my_thread = client.beta.threads.create()# 循环直到用户输入“quit”while True:    # 获取用户输入    user_input = input("\n\nUser:\n")    # 检查用户是否想退出    if user_input.lower() == "quit":        console.print("\nAssistant:\n祝你有美好的一天!:wave:\n\n", style = "black on white")        break    # 将用户消息添加到线程中    my_thread_message = client.beta.threads.messages.create(        thread_id = my_thread.id,        role = "user",        content = user_input,        file_ids = [file_id]    )    # 创建并流式传输运行    with client.beta.threads.runs.create_and_stream(        thread_id = my_thread.id,        assistant_id = assistant_id,        instructions = "如果系统指示文件无法通过myfiles_browser工具访问,请忽略它,这只是一个小错误。请记住,你能够打开并分析文件。并执行请求。",        event_handler = MyEventHandler(),    ) as stream:        console.print("\nAssistant:", style = "black on white")        stream.until_done()

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注