如何在langchain和Flask中从OpenAI切换到ChatOpenAI?

这是一个基于langchain和flask的实现,参考了一种从OpenAI服务器流式传输响应到包含可以显示流式响应的javascript页面的实现方法。

我尝试了所有方法来修改下面的代码,将langchain库从openai替换为chatopenai,但没有成功,我上传了两种实现(使用openai的有效版本)和使用chatopenai的出错版本。感谢所有社区成员和那些能帮助我理解问题的人,如果您能展示如何解决这个问题将非常有帮助,因为我已经尝试了几天,而显示的错误实际上没有任何意义。

使用已被标记为废弃的库的代码版本,但仍能正常工作:

from flask import Flask, Responseimport threadingimport queuefrom langchain.llms import OpenAIfrom langchain.callbacks.base import BaseCallbackManagerfrom langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandlerapp = Flask(__name__)@app.route('/')def index():    return Response('''<!DOCTYPE html><html><head><title>Flask Streaming Langchain Example</title></head><body>    <div id="output"></div>    <script>const outputEl = document.getElementById('output');(async function() {    try {        const controller = new AbortController();        const signal = controller.signal;        const timeout = 120000; // Imposta il timeout su 120 secondi        setTimeout(() => controller.abort(), timeout);        const response = await fetch('/chain', {method: 'POST', signal});        const reader = response.body.getReader();        const decoder = new TextDecoder();        let buffer = '';        while (true) {            const { done, value } = await reader.read();            if (done) { break; }            const text = decoder.decode(value, {stream: true});            outputEl.innerHTML += text;        }    } catch (err) {        console.error(err);    }})();    </script></body></html>''', mimetype='text/html')class ThreadedGenerator:    def __init__(self):        self.queue = queue.Queue()    def __iter__(self):        return self    def __next__(self):        item = self.queue.get()        if item is StopIteration: raise item        return item    def send(self, data):        self.queue.put(data)    def close(self):        self.queue.put(StopIteration)class ChainStreamHandler(StreamingStdOutCallbackHandler):    def __init__(self, gen):        super().__init__()        self.gen = gen    def on_llm_new_token(self, token: str, **kwargs):        self.gen.send(token)def llm_thread(g, prompt):    try:        llm = OpenAI(            model_name="gpt-4",            verbose=True,            streaming=True,            callback_manager=BaseCallbackManager([ChainStreamHandler(g)]),            temperature=0.7,        )        llm(prompt)    finally:        g.close()def chain(prompt):    g = ThreadedGenerator()    threading.Thread(target=llm_thread, args=(g, prompt)).start()    return g@app.route('/chain', methods=['POST'])def _chain():    return Response(chain("Create a poem about the meaning of life \n\n"), mimetype='text/plain')if __name__ == '__main__':    app.run(threaded=True, debug=True)

出现错误的版本(OpenAI被替换为ChatOpenAI)

from flask import Flask, Responseimport threadingimport queuefrom langchain.chat_models import ChatOpenAIfrom langchain.callbacks.base import BaseCallbackManagerfrom langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandlerapp = Flask(__name__)@app.route('/')def index():    return Response('''<!DOCTYPE html><html><head><title>Flask Streaming Langchain Example</title></head><body>    <div id="output"></div>    <script>const outputEl = document.getElementById('output');(async function() {    try {        const controller = new AbortController();        const signal = controller.signal;        const timeout = 120000; // Imposta il timeout su 120 secondi        setTimeout(() => controller.abort(), timeout);        const response = await fetch('/chain', {method: 'POST', signal});        const reader = response.body.getReader();        const decoder = new TextDecoder();        let buffer = '';        while (true) {            const { done, value } = await reader.read();            if (done) { break; }            const text = decoder.decode(value, {stream: true});            outputEl.innerHTML += text;        }    } catch (err) {        console.error(err);    }})();    </script></body></html>''', mimetype='text/html')class ThreadedGenerator:    def __init__(self):        self.queue = queue.Queue()    def __iter__(self):        return self    def __next__(self):        item = self.queue.get()        if item is StopIteration: raise item        return item    def send(self, data):        self.queue.put(data)    def close(self):        self.queue.put(StopIteration)class ChainStreamHandler(StreamingStdOutCallbackHandler):    def __init__(self, gen):        super().__init__()        self.gen = gen    def on_llm_new_token(self, token: str, **kwargs):        self.gen.send(token)    def on_chat_model_start(self, token: str):        print("started")def llm_thread(g, prompt):    try:        llm = ChatOpenAI(            model_name="gpt-4",            verbose=True,            streaming=True,            callback_manager=BaseCallbackManager([ChainStreamHandler(g)]),            temperature=0.7,        )        llm(prompt)    finally:        g.close()def chain(prompt):    g = ThreadedGenerator()    threading.Thread(target=llm_thread, args=(g, prompt)).start()    return g@app.route('/chain', methods=['POST'])def _chain():    return Response(chain("parlami dei 5 modi di dire in inglese che gli italiani conoscono meno \n\n"), mimetype='text/plain')if __name__ == '__main__':    app.run(threaded=True, debug=True)

启动时和进入网页时控制台显示的错误:

Error in ChainStreamHandler.on_chat_model_start callback: ChainStreamHandler.on_chat_model_start() got an unexpected keyword argument 'run_id'Exception in thread Thread-4 (llm_thread):127.0.0.1 - - [09/Sep/2023 18:09:29] "POST /chain HTTP/1.1" 200 -Traceback (most recent call last):  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 300, in _handle_event    getattr(handler, event_name)(*args, **kwargs)  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\base.py", line 168, in on_chat_model_start    raise NotImplementedError(NotImplementedError: StdOutCallbackHandler does not implement `on_chat_model_start`During handling of the above exception, another exception occurred:Traceback (most recent call last):  File "C:\Users\user22\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner        self.run()  File "C:\Users\user22\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 975, in run    self._target(*self._args, **self._kwargs)  File "c:\Users\user22\Desktop\Work\TESTPROJ\streamresp.py", line 90, in llm_thread    llm(prompt)  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\chat_models\base.py", line 552, in __call__    generation = self.generate(                 ^^^^^^^^^^^^^^  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\chat_models\base.py", line 293, in generate    run_managers = callback_manager.on_chat_model_start(                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 1112, in on_chat_model_start    _handle_event(  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 304, in _handle_event    message_strings = [get_buffer_string(m) for m in args[1]]                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 304, in <listcomp>    message_strings = [get_buffer_string(m) for m in args[1]]                       ^^^^^^^^^^^^^^^^^^^^  File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\schema\messages.py", line 52, in get_buffer_string    raise ValueError(f"Got unsupported message type: {m}")ValueError: Got unsupported message type: p

非常感谢您的支持!


回答:

感谢github上的python273用户的帮助,我已经解决了这个问题:

import osos.environ["OPENAI_API_KEY"] = ""from flask import Flask, Response, requestimport threadingimport queuefrom langchain.chat_models import ChatOpenAIfrom langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandlerfrom langchain.schema import AIMessage, HumanMessage, SystemMessageapp = Flask(__name__)@app.route('/')def index():    # 仅为示例,html直接包含,建议移到.html文件中    return Response('''<!DOCTYPE html><html><head><title>Flask Streaming Langchain Example</title></head><body>    <form id="form">        <input name="prompt" value="write a short koan story about seeing beyond"/>        <input type="submit"/>    </form>    <div id="output"></div>    <script>        const formEl = document.getElementById('form');        const outputEl = document.getElementById('output');        let aborter = new AbortController();        async function run() {            aborter.abort();  // 取消之前的请求            outputEl.innerText = '';            aborter = new AbortController();            const prompt = new FormData(formEl).get('prompt');            try {                const response = await fetch(                    '/chain', {                        signal: aborter.signal,                        method: 'POST',                        headers: {'Content-Type': 'application/json'},                        body: JSON.stringify({                            prompt                        }),                    }                );                const reader = response.body.getReader();                const decoder = new TextDecoder();                while (true) {                    const { done, value } = await reader.read();                    if (done) { break; }                    const decoded = decoder.decode(value, {stream: true});                    outputEl.innerText += decoded;                }            } catch (err) {                console.error(err);            }        }        run();  // 初始提示时运行        formEl.addEventListener('submit', function(event) {            event.preventDefault();            run();        });    </script></body></html>''', mimetype='text/html')class ThreadedGenerator:    def __init__(self):        self.queue = queue.Queue()    def __iter__(self):        return self    def __next__(self):        item = self.queue.get()        if item is StopIteration: raise item        return item    def send(self, data):        self.queue.put(data)    def close(self):        self.queue.put(StopIteration)class ChainStreamHandler(StreamingStdOutCallbackHandler):    def __init__(self, gen):        super().__init__()        self.gen = gen    def on_llm_new_token(self, token: str, **kwargs):        self.gen.send(token)def llm_thread(g, prompt):    try:        chat = ChatOpenAI(            verbose=True,            streaming=True,            callbacks=[ChainStreamHandler(g)],            temperature=0.7,        )        chat([HumanMessage(content=prompt)])    finally:        g.close()def chain(prompt):    g = ThreadedGenerator()    threading.Thread(target=llm_thread, args=(g, prompt)).start()    return g@app.route('/chain', methods=['POST'])def _chain():    return Response(chain(request.json['prompt']), mimetype='text/plain')if __name__ == '__main__':    app.run(threaded=True, debug=True)

原始回复的链接: https://gist.github.com/python273/563177b3ad5b9f74c0f8f3299ec13850

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注