You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
119 lines
3.5 KiB
119 lines
3.5 KiB
#coding:utf8
|
|
import re
|
|
from jsonpath_ng import parse
|
|
import json
|
|
import traceback
|
|
import regex
|
|
from log_util.set_logger import set_logger
|
|
def parse_data(data,para):
|
|
param_split = str(para).split(":")
|
|
datasourcestr = data[param_split[0]]
|
|
datasource = json.loads(datasourcestr)
|
|
# 创建 JsonPath 表达式对象
|
|
expr = parse(param_split[1])
|
|
# 使用表达式来选择 JSON 元素
|
|
match = [match.value for match in expr.find(datasource)]
|
|
val = match[0]
|
|
return val
|
|
|
|
def parse_gptResult(output,gptContent):
|
|
json_result = {}
|
|
try:
|
|
json_gpt=json.loads(str(gptContent).replace("```json","").replace("```",""))
|
|
for key in output.keys():
|
|
if key in json_gpt.keys():
|
|
json_result[key]=json_gpt[key]
|
|
|
|
return json_result
|
|
except Exception as e:
|
|
try:
|
|
# 直接解析失败,使用正则表达式匹配外层的 {}
|
|
pattern = r'\{(?:[^{}]*|(?R))*\}'
|
|
match = regex.search(pattern, gptContent, flags=regex.DOTALL)
|
|
# match = re.search(pattern, gptContent, re.DOTALL)
|
|
if match:
|
|
json_gpt = json.loads(match.group())
|
|
for key in output.keys():
|
|
if key in json_gpt.keys():
|
|
json_result[key]=json_gpt[key]
|
|
return json_result
|
|
else:
|
|
return None
|
|
except:
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
|
|
def get_content(inputdata,logging):
|
|
"""
|
|
重新组装参数
|
|
:param inputdata:原json数据
|
|
:return: 组装的prompt及其他参数
|
|
"""
|
|
res={}
|
|
input=inputdata["input"]
|
|
if "data" not in inputdata.keys():
|
|
data=""
|
|
else:
|
|
data=inputdata["data"]
|
|
prompts=input["prompt"]
|
|
prompt_res=""
|
|
if prompts:
|
|
for prompt in prompts:
|
|
if str(prompt["type"])=="1":
|
|
prompt_res+=prompt["value"]
|
|
elif str(prompt["type"])=="2":
|
|
try:
|
|
tmp=parse_data(data,prompt["value"])
|
|
prompt_res +=tmp
|
|
except:
|
|
logging.info("动态字段获取数据失败。{}-{}".format(prompt, traceback.format_exc()))
|
|
|
|
# logging.info("拼接后的问题:{}".format(prompt_res))
|
|
res["prompt"]=prompt_res
|
|
# res["authorization"]=input["authorization"]
|
|
# res["model"]=input["model"]
|
|
res["temperature"]=input["temperature"]
|
|
res["top_p"]=input["top_p"]
|
|
res["n"]=input["n"]
|
|
return res
|
|
|
|
# def get_content(inputdata,logging):
|
|
# """
|
|
# 重新组装参数
|
|
# :param inputdata:原json数据
|
|
# :return: 组装的prompt及其他参数
|
|
# """
|
|
# res={}
|
|
# admin=inputdata["metadata"]["admin"]
|
|
# data=inputdata["data"]
|
|
# prompt=admin["prompt"]
|
|
# if_user=re.findall("{{(.*)}}",prompt)
|
|
# if_data=re.findall("@@(.*)@@",prompt)
|
|
# if if_user != []:
|
|
# user_data=inputdata["metadata"]["user"]
|
|
# if if_user[0] in user_data.keys():
|
|
# tmp=user_data[if_user[0]]
|
|
# prompt=re.sub("{{(.*)}}",tmp,prompt)
|
|
# if if_data!=[] and if_data[0] in data.keys():
|
|
# tmp1=data[if_data[0]]
|
|
# prompt=re.sub("@@(.*)@@",tmp1,prompt)
|
|
# res["prompt"]=prompt
|
|
# res["authorization"]=admin["authorization"]
|
|
# res["model"]=admin["model"]
|
|
# res["temperature"]=admin["temperature"]
|
|
# res["authorization"]=admin["authorization"]
|
|
# res["top_p"]=admin["top_p"]
|
|
# res["n"]=admin["n"]
|
|
# return res
|
|
|
|
if __name__=="__main__":
|
|
|
|
parse_gptResult()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|