根据可用的信息(遗憾的是,ChatGPT在这方面帮助不大),我创建了以下代码,使我能够与OpenAI Assistants API进行交互。
然而,我仍然不喜欢_wait_for_run_completion
方法和while
循环。是否有更好的处理方式?
import osimport openaifrom dotenv import load_dotenvimport timeclass OpenAIChatAssistant: def __init__(self, assistant_id, model="gpt-4o"): self.assistant_id = assistant_id self.model = model if self.model != "just_copy": load_dotenv() openai.api_key = os.environ.get("OPENAI_API_KEY") self.client = openai.OpenAI() self._create_new_thread() print('new instance started') def _create_new_thread(self): self.thread = self.client.beta.threads.create() self.thread_id = self.thread.id print(self.thread_id) def reset_thread(self): if self.model != "just_copy": self._create_new_thread() def set_model(self, model_name): self.model = model_name if self.model != "just_copy" and not hasattr(self, 'client'): load_dotenv() openai.api_key = os.environ.get("OPENAI_API_KEY") self.client = openai.OpenAI() self._create_new_thread() def send_message(self, message): if self.model == "just_copy": return message self.client.beta.threads.messages.create( thread_id=self.thread_id, role="user", content=message ) run = self.client.beta.threads.runs.create( thread_id=self.thread_id, assistant_id=self.assistant_id, model=self.model ) return self._wait_for_run_completion(run.id) def _wait_for_run_completion(self, run_id, sleep_interval=1): counter = 1 while True: try: run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id) if run.completed_at: messages = self.client.beta.threads.messages.list(thread_id=self.thread_id) last_message = messages.data[0] response = last_message.content[0].text.value print(f'hello {counter}') return response except Exception as e: raise RuntimeError(f"An error occurred while retrieving answer: {e}") counter += 1 time.sleep(sleep_interval)
该类可以在控制台应用程序中这样使用:
import osfrom openai_chat_assistant import OpenAIChatAssistantdef main(): assistant_id = "asst_..." chat_assistant = OpenAIChatAssistant(assistant_id) while True: question = input("请输入您的问题(输入'exit'退出,'clean'重置):") if question.lower() == 'exit': break elif question.lower() == 'clean': os.system('cls' if os.name == 'nt' else 'clear') chat_assistant.reset_thread() print("控制台已清除,线程已重置。") else: response = chat_assistant.send_message(question) print(f"助手回答:{response}")if __name__ == "__main__": main()
当然,需要assistant_id
。我将其设置在.env
文件中,与API密钥相同:
OPENAI_API_KEY=sk-proj-...
回答:
关于_wait_for_run_completion
方法
我从2022年12月开始使用OpenAI API,并且每周都会使用。据我所知,获取助手响应的最佳方法就是简单地检查运行状态,当运行状态变为completed
时,提取助手的响应。
PS:如果你在比较ChatGPT的延迟和你的代码,请不要这样做。ChatGPT的免费版本使用的是GPT-4o(有限制)和GPT-3.5。ChatGPT不使用Assistants API。
关于“增量显示回答”
你提到的是响应流,这在Assistants API中是可能的。OpenAI Python SDK已经为Assistants API实现了create and stream帮助器。这些帮助器允许你订阅你感兴趣的事件类型。你要找的事件是on_text_delta
。你需要订阅on_text_delta
才能流式传输助手的响应。
我之前开发了一个带有响应流的助手终端用户界面(即客户支持聊天机器人)(请参见YouTube上的教程和GitHub上的代码)。
基本上,有两个步骤。
步骤1:定义用于流事件的事件处理程序类,并订阅on_text_delta
事件
from openai import AssistantEventHandlerclass MyEventHandler(AssistantEventHandler): # 👈 定义类 def on_text_delta(self, delta, snapshot): # 👈 订阅事件 print(delta.value, end = "") def on_error(error): print(error)
步骤2:将类传递给event_handler
参数
with client.beta.threads.runs.create_and_stream( thread_id = my_thread.id, assistant_id = assistant_id, event_handler = MyEventHandler(), # 👈 传递类 ) as stream: print("\nAssistant:") stream.until_done()
完整代码:
import osfrom dotenv import load_dotenvfrom openai import OpenAI, AssistantEventHandlerfrom rich.console import Console# 从.env文件加载环境变量load_dotenv()# 设置OpenAI客户端client = OpenAI()OpenAI.api_key = os.getenv('OPENAI_API_KEY')# 设置rich控制台以格式化输出console = Console()# 从环境变量中获取文件和助手IDfile_id = os.getenv('OPENAI_FILE_ID')assistant_id = os.getenv('OPENAI_ASSISTANT_ID')# 定义用于流事件的事件处理程序类class MyEventHandler(AssistantEventHandler): def on_text_delta(self, delta, snapshot): console.print(delta.value, end = "", style = "black on white") def on_error(error): print(error)# 创建一个新线程my_thread = client.beta.threads.create()# 循环直到用户输入“quit”while True: # 获取用户输入 user_input = input("\n\nUser:\n") # 检查用户是否想退出 if user_input.lower() == "quit": console.print("\nAssistant:\n祝你有美好的一天!:wave:\n\n", style = "black on white") break # 将用户消息添加到线程中 my_thread_message = client.beta.threads.messages.create( thread_id = my_thread.id, role = "user", content = user_input, file_ids = [file_id] ) # 创建并流式传输运行 with client.beta.threads.runs.create_and_stream( thread_id = my_thread.id, assistant_id = assistant_id, instructions = "如果系统指示文件无法通过myfiles_browser工具访问,请忽略它,这只是一个小错误。请记住,你能够打开并分析文件。并执行请求。", event_handler = MyEventHandler(), ) as stream: console.print("\nAssistant:", style = "black on white") stream.until_done()