我正在使用Azure OpenAI根据我通过Postman发送的查询生成响应,使用PostAPI的方式如下所示:{"query":"what is de-iceing"}
现在,一旦它开始运行,我能够看到终端显示为Azure Function运行时版本: 3.11 [2024-10-18T] 请求体: b'{"query":"what is de-iceing"}' [2024-10-18T] data
但是我无法从OpenAI的StreamingResponse中获取生成的响应。附件是我Azure函数的完整代码。
# 导入库import azure.functions as funcimport logging# from langchain_text_splitters import CharacterTextSplitterfrom langchain_openai.embeddings import OpenAIEmbeddingsfrom langchain_openai import AzureOpenAIEmbeddingsimport osfrom openai import AzureOpenAIimport openai# from dotenv import load_dotenvfrom langchain_community.vectorstores.azuresearch import AzureSearchimport jsonfrom azurefunctions.extensions.http.fastapi import Request, StreamingResponseimport asyncio# load_dotenv()# 创建Azure函数实例 app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)# 从Azure Open AI获取数据# async def stream_processor(response):# async for chunk in response:# if len(chunk.choices) > 0:# delta = chunk.choices[0].delta# if delta.content: # 获取剩余的生成响应(如果适用)# await asyncio.sleep(0.1)# yield delta.contentasync def stream_processor(response): if hasattr(response, '__aiter__'): async for chunk in response: if len(chunk.choices) > 0: delta = chunk.choices[0].delta if delta.content: await asyncio.sleep(0.1) yield delta.content else: # 处理非异步可迭代情况(如果必要) for chunk in response: if len(chunk.choices) > 0: delta = chunk.choices[0].delta if delta.content: yield delta.content@app.route(route="v2_AMMBackend")async def v2_AMMBackend(req: Request) -> StreamingResponse: logging.info('Python HTTP触发器函数处理了一个请求。') runtime_version = os.environ.get('FUNCTIONS_WORKER_RUNTIME_VERSION') logging.info(f"Azure函数运行时版本: {runtime_version}") logging.info(req) req_body = await req.body() # 使用await异步获取请求体 logging.info(f'请求体: {req_body}') # 将请求体解析为JSON data = json.loads(req_body.decode('utf-8')) logging.info('data') logging.info(data) # req_body = req.get_body().decode('utf-8') # # 将请求体解析为JSON # data = json.loads(req_body) # 从请求体中访问变量 variable1 = data.get("query") logging.info(variable1) # 使用AzureOpenAIEmbeddings与Azure账户 embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings( azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"), openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"), azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"), api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'), ) # 调用AzureSearch,指定索引名称 index_name= "de-iceing-test" vector_store: AzureSearch = AzureSearch( azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"), azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"), index_name=index_name, embedding_function=embeddings.embed_query, ) # 调用AzureOpenAI client = AzureOpenAI( api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"), api_version = "2023-07-01-preview", azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT") ) # 执行相似性搜索 def userchat(query): docs = vector_store.similarity_search( query=fr"{query}", k=2, search_type="similarity", ) chunk = [item.page_content for item in docs] return chunk context = userchat(variable1) prompt = fr"Context: {context} \ Based on the above context please provide a clear and concise answer to the user's query/question mentioned below.\ Query: {variable1}.\ If the query is about the procedue or the method then generate the text in bullet points." prompt1 = fr"Here is the query from the user, Query: {variable1} \ Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\ Content: {context}" # async def get_chat_completion(prompt): response = client.chat.completions.create( model="cbothr-gpt-4-001", stream=True, messages=[ {"role": "user", "content": f"{prompt}"} ] ) logging.info('response') logging.info(response)# 确保在异步上下文中调用函数 if hasattr(response, '__aiter__'): return StreamingResponse(stream_processor(response), media_type="text/event-stream") else: return func.HttpResponse( "Error: Response is not iterable.", status_code=500 )
我无法按代码最后几行提到的方式获取响应,而我得到的是:
{ "_HttpResponse__status_code": 500, "_HttpResponse__mimetype": "text/plain", "_HttpResponse__charset": "utf-8", "_HttpResponse__headers": {}, "_HttpResponse__body": "Error: Response is not iterable."}
我该怎么办?
回答:
您需要使用下面的方式创建Azure OpenAI客户端,使用AsyncAzureOpenAI。
client = openai.AsyncAzureOpenAI( api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"), api_version = "2024-08-01-preview", azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT") )
您的完整代码应该如下所示。
import azure.functions as funcimport loggingfrom langchain_openai import AzureOpenAIEmbeddingsimport osimport openaifrom langchain_community.vectorstores.azuresearch import AzureSearchimport jsonfrom azurefunctions.extensions.http.fastapi import Request, StreamingResponseimport asyncioapp = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)async def stream_processor(response): async for chunk in response: if len(chunk.choices) > 0: delta = chunk.choices[0].delta if delta.content: await asyncio.sleep(0.1) yield delta.content@app.route(route="http_trigger", methods=[func.HttpMethod.POST])async def http_trigger(req: Request) -> StreamingResponse: logging.info('Python HTTP触发器函数处理了一个请求。') req_body = await req.body() logging.info(f'请求体: {req_body}') data = json.loads(req_body.decode('utf-8')) variable1 = data.get("query") embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings( azure_deployment=os.environ.get("EMB_AZURE_DEPLOYMENT"), openai_api_version=os.environ.get("EMB_AZURE_OPENAI_API_VERSION"), azure_endpoint=os.environ.get("EMB_AZURE_ENDPOINT"), api_key=os.environ.get('EMB_AZURE_OPENAI_API_KEY'), ) index_name= "de-iceing-test" vector_store: AzureSearch = AzureSearch( azure_search_endpoint=os.environ.get("VECTOR_STORE_ADDRESS"), azure_search_key=os.environ.get("VECTOR_STORE_PASSWORD"), index_name=index_name, embedding_function=embeddings.embed_query, ) client = openai.AsyncAzureOpenAI( api_key = os.getenv("GPT_AZURE_OPENAI_API_KEY"), api_version = "2024-08-01-preview", azure_endpoint = os.getenv("GPT_AZURE_OPENAI_ENDPOINT") ) def userchat(query): docs = vector_store.similarity_search( query=fr"{query}", k=2, search_type="similarity", ) chunk = [item.page_content for item in docs] return chunk context = userchat(variable1) prompt = fr"Content: {context} \ please provide a clear and concise answer to the user's query/question mentioned below.\ Query: {variable1}.\ If the query is about the procedue or the method then generate the text in bullet points." prompt1 = fr"Here is the query from the user, Query: {variable1} \ Can you please provide a clear and concise solution to the user's query based on the content below and summary below.\ Content: {context}" logging.info(prompt) response = await client.chat.completions.create( model="gpt-4", messages=[ {"role": "user", "content": f"{prompt}"} ], stream=True ) logging.info(f"OpenAI API响应对象: {response}") if response is not None: logging.info("流式传输开始...") return StreamingResponse(stream_processor(response), media_type="text/event-stream") else: return func.HttpResponse( "Error: Response is not iterable or streaming not supported.", status_code=500 )