使用Azure OpenAI的StreamingResponse无法从HTTP Post API获取所需的响应

我正在使用Azure OpenAI根据我通过Postman发送的查询生成响应,使用PostAPI的方式如下所示:{"query":"what is de-iceing"}

现在,一旦它开始运行,我能够看到终端显示为Azure Function运行时版本: 3.11 [2024-10-18T] 请求体: b'{"query":"what is de-iceing"}' [2024-10-18T] data但是我无法从OpenAI的StreamingResponse中获取生成的响应。附件是我Azure函数的完整代码。

# 导入库import azure.functions as funcimport logging# from langchain_text_splitters import CharacterTextSplitterfrom langchain_openai.embeddings import OpenAIEmbeddingsfrom langchain_openai import AzureOpenAIEmbeddingsimport osfrom openai import AzureOpenAIimport openai# from dotenv import load_dotenvfrom langchain_community.vectorstores.azuresearch import AzureSearchimport jsonfrom azurefunctions.extensions.http.fastapi import Request, StreamingResponseimport asyncio# load_dotenv()# 创建Azure函数实例 app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)# 从Azure Open AI获取数据# async def stream_processor(response):#     async for chunk in response:#         if len(chunk.choices) > 0:#             delta = chunk.choices[0].delta#             if delta.content: # 获取剩余的生成响应(如果适用)#                 await asyncio.sleep(0.1)#                 yield delta.contentasync def stream_processor(response):    if hasattr(response, '__aiter__'):        async for chunk in response:            if len(chunk.choices) > 0:                delta = chunk.choices[0].delta                if delta.content:                    await asyncio.sleep(0.1)                    yield delta.content    else:        # 处理非异步可迭代情况(如果必要)        for chunk in response:            if len(chunk.choices) > 0:                delta = chunk.choices[0].delta                if delta.content:                    yield delta.content@app.route(route="v2_AMMBackend")async def v2_AMMBackend(req: Request) -> StreamingResponse:    logging.info('Python HTTP触发器函数处理了一个请求。')    runtime_version = os.environ.get('FUNCTIONS_WORKER_RUNTIME_VERSION')    logging.info(f"Azure函数运行时版本: {runtime_version}")    logging.info(req)    req_body = await req.body()  # 使用await异步获取请求体    logging.info(f'请求体: {req_body}')        # 将请求体解析为JSON    data = json.loads(req_body.decode('utf-8'))    logging.info('data')    logging.info(data)    # req_body = req.get_body().decode('utf-8')    # # 将请求体解析为JSON    # data = json.loads(req_body)    # 从请求体中访问变量    variable1 = data.get("query")    logging.info(variable1)    # 使用AzureOpenAIEmbeddings与Azure账户    embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(        azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"),        openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"),        azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"),        api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'),    )    # 调用AzureSearch,指定索引名称    index_name= "de-iceing-test"    vector_store: AzureSearch = AzureSearch(        azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"),        azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"),        index_name=index_name,        embedding_function=embeddings.embed_query,    )    # 调用AzureOpenAI    client = AzureOpenAI(    api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),      api_version = "2023-07-01-preview",    azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")    )    # 执行相似性搜索    def userchat(query):        docs = vector_store.similarity_search(            query=fr"{query}",            k=2,            search_type="similarity",        )        chunk = [item.page_content for item in docs]        return chunk        context = userchat(variable1)    prompt = fr"Context: {context} \                Based on the above context please provide a clear and concise answer to the user's query/question mentioned below.\                Query: {variable1}.\                If the query is about the procedue or the method then generate the text in bullet points."    prompt1 = fr"Here is the query from the user, Query: {variable1} \                Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\                Content: {context}"    # async def get_chat_completion(prompt):    response = client.chat.completions.create(        model="cbothr-gpt-4-001",        stream=True,        messages=[            {"role": "user", "content": f"{prompt}"}        ]    )    logging.info('response')    logging.info(response)# 确保在异步上下文中调用函数    if hasattr(response, '__aiter__'):        return StreamingResponse(stream_processor(response), media_type="text/event-stream")    else:        return func.HttpResponse(        "Error: Response is not iterable.",        status_code=500    )

我无法按代码最后几行提到的方式获取响应,而我得到的是:

{    "_HttpResponse__status_code": 500,    "_HttpResponse__mimetype": "text/plain",    "_HttpResponse__charset": "utf-8",    "_HttpResponse__headers": {},    "_HttpResponse__body": "Error: Response is not iterable."}

我该怎么办?


回答:

您需要使用下面的方式创建Azure OpenAI客户端,使用AsyncAzureOpenAI

client = openai.AsyncAzureOpenAI(    api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),      api_version = "2024-08-01-preview",    azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")    )

您的完整代码应该如下所示。

import azure.functions as funcimport loggingfrom langchain_openai import AzureOpenAIEmbeddingsimport osimport openaifrom langchain_community.vectorstores.azuresearch import AzureSearchimport jsonfrom azurefunctions.extensions.http.fastapi import Request, StreamingResponseimport asyncioapp = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)async def stream_processor(response):    async for chunk in response:        if len(chunk.choices) > 0:            delta = chunk.choices[0].delta            if delta.content:                 await asyncio.sleep(0.1)                yield delta.content@app.route(route="http_trigger", methods=[func.HttpMethod.POST])async def http_trigger(req: Request) -> StreamingResponse:    logging.info('Python HTTP触发器函数处理了一个请求。')        req_body = await req.body()      logging.info(f'请求体: {req_body}')     data = json.loads(req_body.decode('utf-8'))    variable1 = data.get("query")        embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(        azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"),        openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"),        azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"),        api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'),    )    index_name= "de-iceing-test"    vector_store: AzureSearch = AzureSearch(        azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"),        azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"),        index_name=index_name,        embedding_function=embeddings.embed_query,    )    client = openai.AsyncAzureOpenAI(    api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"),      api_version = "2024-08-01-preview",    azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT")    )    def userchat(query):        docs = vector_store.similarity_search(            query=fr"{query}",            k=2,            search_type="similarity",        )        chunk = [item.page_content for item in docs]        return chunk        context = userchat(variable1)    prompt = fr"Content: {context} \                please provide a clear and concise answer to the user's query/question mentioned below.\                Query: {variable1}.\                If the query is about the procedue or the method then generate the text in bullet points."    prompt1 = fr"Here is the query from the user, Query: {variable1} \                Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\                Content: {context}"        logging.info(prompt)    response = await client.chat.completions.create(        model="gpt-4",        messages=[            {"role": "user", "content": f"{prompt}"}        ],        stream=True    )    logging.info(f"OpenAI API响应对象: {response}")    if response is not None:        logging.info("流式传输开始...")        return StreamingResponse(stream_processor(response), media_type="text/event-stream")    else:        return func.HttpResponse(            "Error: Response is not iterable or streaming not supported.",            status_code=500    )

enter image description here

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注