OpenAI Assistants API: 等待助手响应是否有更好的方法,以及如何增量显示助手的回答?

根据可用的信息(遗憾的是,ChatGPT在这方面帮助不大),我创建了以下代码,使我能够与OpenAI Assistants API进行交互。

然而,我仍然不喜欢_wait_for_run_completion方法和while循环。是否有更好的处理方式?

import osimport openaifrom dotenv import load_dotenvimport timeclass OpenAIChatAssistant:    def __init__(self, assistant_id, model="gpt-4o"):        self.assistant_id = assistant_id        self.model = model        if self.model != "just_copy":            load_dotenv()            openai.api_key = os.environ.get("OPENAI_API_KEY")            self.client = openai.OpenAI()            self._create_new_thread()        print('new instance started')    def _create_new_thread(self):        self.thread = self.client.beta.threads.create()        self.thread_id = self.thread.id        print(self.thread_id)    def reset_thread(self):        if self.model != "just_copy":            self._create_new_thread()    def set_model(self, model_name):        self.model = model_name        if self.model != "just_copy" and not hasattr(self, 'client'):            load_dotenv()            openai.api_key = os.environ.get("OPENAI_API_KEY")            self.client = openai.OpenAI()            self._create_new_thread()    def send_message(self, message):        if self.model == "just_copy":            return message        self.client.beta.threads.messages.create(            thread_id=self.thread_id, role="user", content=message        )        run = self.client.beta.threads.runs.create(            thread_id=self.thread_id,            assistant_id=self.assistant_id,            model=self.model        )        return self._wait_for_run_completion(run.id)    def _wait_for_run_completion(self, run_id, sleep_interval=1):        counter = 1        while True:            try:                run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)                if run.completed_at:                    messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)                    last_message = messages.data[0]                    response = last_message.content[0].text.value                    print(f'hello {counter}')                    return response            except Exception as e:                raise RuntimeError(f"An error occurred while retrieving answer: {e}")            counter += 1            time.sleep(sleep_interval)

该类可以在控制台应用程序中这样使用:

import osfrom openai_chat_assistant import OpenAIChatAssistantdef main():    assistant_id = "asst_..."    chat_assistant = OpenAIChatAssistant(assistant_id)    while True:        question = input("请输入您的问题(输入'exit'退出,'clean'重置):")        if question.lower() == 'exit':            break        elif question.lower() == 'clean':            os.system('cls' if os.name == 'nt' else 'clear')            chat_assistant.reset_thread()            print("控制台已清除,线程已重置。")        else:            response = chat_assistant.send_message(question)            print(f"助手回答:{response}")if __name__ == "__main__":    main()

当然,需要assistant_id。我将其设置在.env文件中,与API密钥相同:

OPENAI_API_KEY=sk-proj-...

回答:

关于_wait_for_run_completion方法

我从2022年12月开始使用OpenAI API,并且每周都会使用。据我所知,获取助手响应的最佳方法就是简单地检查运行状态,当运行状态变为completed时,提取助手的响应。

PS:如果你在比较ChatGPT的延迟和你的代码,请不要这样做。ChatGPT的免费版本使用的是GPT-4o(有限制)和GPT-3.5。ChatGPT不使用Assistants API。

关于“增量显示回答”

你提到的是响应流,这在Assistants API中是可能的。OpenAI Python SDK已经为Assistants API实现了create and stream帮助器。这些帮助器允许你订阅你感兴趣的事件类型。你要找的事件是on_text_delta。你需要订阅on_text_delta才能流式传输助手的响应。

我之前开发了一个带有响应流的助手终端用户界面(即客户支持聊天机器人)(请参见YouTube上的教程和GitHub上的代码)。

基本上,有两个步骤。

步骤1:定义用于流事件的事件处理程序类,并订阅on_text_delta事件

from openai import AssistantEventHandlerclass MyEventHandler(AssistantEventHandler): # 👈 定义类    def on_text_delta(self, delta, snapshot): # 👈 订阅事件        print(delta.value, end = "")    def on_error(error):        print(error)

步骤2:将类传递给event_handler参数

with client.beta.threads.runs.create_and_stream(        thread_id = my_thread.id,        assistant_id = assistant_id,        event_handler = MyEventHandler(), # 👈 传递类    ) as stream:        print("\nAssistant:")        stream.until_done()

完整代码

import osfrom dotenv import load_dotenvfrom openai import OpenAI, AssistantEventHandlerfrom rich.console import Console# 从.env文件加载环境变量load_dotenv()# 设置OpenAI客户端client = OpenAI()OpenAI.api_key = os.getenv('OPENAI_API_KEY')# 设置rich控制台以格式化输出console = Console()# 从环境变量中获取文件和助手IDfile_id = os.getenv('OPENAI_FILE_ID')assistant_id = os.getenv('OPENAI_ASSISTANT_ID')# 定义用于流事件的事件处理程序类class MyEventHandler(AssistantEventHandler):    def on_text_delta(self, delta, snapshot):        console.print(delta.value, end = "", style = "black on white")    def on_error(error):        print(error)# 创建一个新线程my_thread = client.beta.threads.create()# 循环直到用户输入“quit”while True:    # 获取用户输入    user_input = input("\n\nUser:\n")    # 检查用户是否想退出    if user_input.lower() == "quit":        console.print("\nAssistant:\n祝你有美好的一天!:wave:\n\n", style = "black on white")        break    # 将用户消息添加到线程中    my_thread_message = client.beta.threads.messages.create(        thread_id = my_thread.id,        role = "user",        content = user_input,        file_ids = [file_id]    )    # 创建并流式传输运行    with client.beta.threads.runs.create_and_stream(        thread_id = my_thread.id,        assistant_id = assistant_id,        instructions = "如果系统指示文件无法通过myfiles_browser工具访问,请忽略它,这只是一个小错误。请记住,你能够打开并分析文件。并执行请求。",        event_handler = MyEventHandler(),    ) as stream:        console.print("\nAssistant:", style = "black on white")        stream.until_done()

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注