这是一个基于langchain和flask的实现,参考了一种从OpenAI服务器流式传输响应到包含可以显示流式响应的javascript页面的实现方法。
我尝试了所有方法来修改下面的代码,将langchain库从openai替换为chatopenai,但没有成功,我上传了两种实现(使用openai的有效版本)和使用chatopenai的出错版本。感谢所有社区成员和那些能帮助我理解问题的人,如果您能展示如何解决这个问题将非常有帮助,因为我已经尝试了几天,而显示的错误实际上没有任何意义。
使用已被标记为废弃的库的代码版本,但仍能正常工作:
from flask import Flask, Responseimport threadingimport queuefrom langchain.llms import OpenAIfrom langchain.callbacks.base import BaseCallbackManagerfrom langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandlerapp = Flask(__name__)@app.route('/')def index(): return Response('''<!DOCTYPE html><html><head><title>Flask Streaming Langchain Example</title></head><body> <div id="output"></div> <script>const outputEl = document.getElementById('output');(async function() { try { const controller = new AbortController(); const signal = controller.signal; const timeout = 120000; // Imposta il timeout su 120 secondi setTimeout(() => controller.abort(), timeout); const response = await fetch('/chain', {method: 'POST', signal}); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) { break; } const text = decoder.decode(value, {stream: true}); outputEl.innerHTML += text; } } catch (err) { console.error(err); }})(); </script></body></html>''', mimetype='text/html')class ThreadedGenerator: def __init__(self): self.queue = queue.Queue() def __iter__(self): return self def __next__(self): item = self.queue.get() if item is StopIteration: raise item return item def send(self, data): self.queue.put(data) def close(self): self.queue.put(StopIteration)class ChainStreamHandler(StreamingStdOutCallbackHandler): def __init__(self, gen): super().__init__() self.gen = gen def on_llm_new_token(self, token: str, **kwargs): self.gen.send(token)def llm_thread(g, prompt): try: llm = OpenAI( model_name="gpt-4", verbose=True, streaming=True, callback_manager=BaseCallbackManager([ChainStreamHandler(g)]), temperature=0.7, ) llm(prompt) finally: g.close()def chain(prompt): g = ThreadedGenerator() threading.Thread(target=llm_thread, args=(g, prompt)).start() return g@app.route('/chain', methods=['POST'])def _chain(): return Response(chain("Create a poem about the meaning of life \n\n"), mimetype='text/plain')if __name__ == '__main__': app.run(threaded=True, debug=True)
出现错误的版本(OpenAI被替换为ChatOpenAI)
from flask import Flask, Responseimport threadingimport queuefrom langchain.chat_models import ChatOpenAIfrom langchain.callbacks.base import BaseCallbackManagerfrom langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandlerapp = Flask(__name__)@app.route('/')def index(): return Response('''<!DOCTYPE html><html><head><title>Flask Streaming Langchain Example</title></head><body> <div id="output"></div> <script>const outputEl = document.getElementById('output');(async function() { try { const controller = new AbortController(); const signal = controller.signal; const timeout = 120000; // Imposta il timeout su 120 secondi setTimeout(() => controller.abort(), timeout); const response = await fetch('/chain', {method: 'POST', signal}); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) { break; } const text = decoder.decode(value, {stream: true}); outputEl.innerHTML += text; } } catch (err) { console.error(err); }})(); </script></body></html>''', mimetype='text/html')class ThreadedGenerator: def __init__(self): self.queue = queue.Queue() def __iter__(self): return self def __next__(self): item = self.queue.get() if item is StopIteration: raise item return item def send(self, data): self.queue.put(data) def close(self): self.queue.put(StopIteration)class ChainStreamHandler(StreamingStdOutCallbackHandler): def __init__(self, gen): super().__init__() self.gen = gen def on_llm_new_token(self, token: str, **kwargs): self.gen.send(token) def on_chat_model_start(self, token: str): print("started")def llm_thread(g, prompt): try: llm = ChatOpenAI( model_name="gpt-4", verbose=True, streaming=True, callback_manager=BaseCallbackManager([ChainStreamHandler(g)]), temperature=0.7, ) llm(prompt) finally: g.close()def chain(prompt): g = ThreadedGenerator() threading.Thread(target=llm_thread, args=(g, prompt)).start() return g@app.route('/chain', methods=['POST'])def _chain(): return Response(chain("parlami dei 5 modi di dire in inglese che gli italiani conoscono meno \n\n"), mimetype='text/plain')if __name__ == '__main__': app.run(threaded=True, debug=True)
启动时和进入网页时控制台显示的错误:
Error in ChainStreamHandler.on_chat_model_start callback: ChainStreamHandler.on_chat_model_start() got an unexpected keyword argument 'run_id'Exception in thread Thread-4 (llm_thread):127.0.0.1 - - [09/Sep/2023 18:09:29] "POST /chain HTTP/1.1" 200 -Traceback (most recent call last): File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 300, in _handle_event getattr(handler, event_name)(*args, **kwargs) File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\base.py", line 168, in on_chat_model_start raise NotImplementedError(NotImplementedError: StdOutCallbackHandler does not implement `on_chat_model_start`During handling of the above exception, another exception occurred:Traceback (most recent call last): File "C:\Users\user22\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner self.run() File "C:\Users\user22\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 975, in run self._target(*self._args, **self._kwargs) File "c:\Users\user22\Desktop\Work\TESTPROJ\streamresp.py", line 90, in llm_thread llm(prompt) File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\chat_models\base.py", line 552, in __call__ generation = self.generate( ^^^^^^^^^^^^^^ File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\chat_models\base.py", line 293, in generate run_managers = callback_manager.on_chat_model_start( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 1112, in on_chat_model_start _handle_event( File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 304, in _handle_event message_strings = [get_buffer_string(m) for m in args[1]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 304, in <listcomp> message_strings = [get_buffer_string(m) for m in args[1]] ^^^^^^^^^^^^^^^^^^^^ File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\schema\messages.py", line 52, in get_buffer_string raise ValueError(f"Got unsupported message type: {m}")ValueError: Got unsupported message type: p
非常感谢您的支持!
回答:
感谢github上的python273用户的帮助,我已经解决了这个问题:
import osos.environ["OPENAI_API_KEY"] = ""from flask import Flask, Response, requestimport threadingimport queuefrom langchain.chat_models import ChatOpenAIfrom langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandlerfrom langchain.schema import AIMessage, HumanMessage, SystemMessageapp = Flask(__name__)@app.route('/')def index(): # 仅为示例,html直接包含,建议移到.html文件中 return Response('''<!DOCTYPE html><html><head><title>Flask Streaming Langchain Example</title></head><body> <form id="form"> <input name="prompt" value="write a short koan story about seeing beyond"/> <input type="submit"/> </form> <div id="output"></div> <script> const formEl = document.getElementById('form'); const outputEl = document.getElementById('output'); let aborter = new AbortController(); async function run() { aborter.abort(); // 取消之前的请求 outputEl.innerText = ''; aborter = new AbortController(); const prompt = new FormData(formEl).get('prompt'); try { const response = await fetch( '/chain', { signal: aborter.signal, method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ prompt }), } ); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) { break; } const decoded = decoder.decode(value, {stream: true}); outputEl.innerText += decoded; } } catch (err) { console.error(err); } } run(); // 初始提示时运行 formEl.addEventListener('submit', function(event) { event.preventDefault(); run(); }); </script></body></html>''', mimetype='text/html')class ThreadedGenerator: def __init__(self): self.queue = queue.Queue() def __iter__(self): return self def __next__(self): item = self.queue.get() if item is StopIteration: raise item return item def send(self, data): self.queue.put(data) def close(self): self.queue.put(StopIteration)class ChainStreamHandler(StreamingStdOutCallbackHandler): def __init__(self, gen): super().__init__() self.gen = gen def on_llm_new_token(self, token: str, **kwargs): self.gen.send(token)def llm_thread(g, prompt): try: chat = ChatOpenAI( verbose=True, streaming=True, callbacks=[ChainStreamHandler(g)], temperature=0.7, ) chat([HumanMessage(content=prompt)]) finally: g.close()def chain(prompt): g = ThreadedGenerator() threading.Thread(target=llm_thread, args=(g, prompt)).start() return g@app.route('/chain', methods=['POST'])def _chain(): return Response(chain(request.json['prompt']), mimetype='text/plain')if __name__ == '__main__': app.run(threaded=True, debug=True)
原始回复的链接: https://gist.github.com/python273/563177b3ad5b9f74c0f8f3299ec13850