更新.gitignore文件以忽略新的数据文件和测试文件,修改对话到工单处理逻辑,添加产品信息解析功能,优化API调用,移除不必要的参数,调整并发处理逻辑。

This commit is contained in:
2025-07-24 13:39:04 +08:00
parent 6707e5904f
commit 4d7ef54ae7
2 changed files with 147 additions and 163 deletions
+4 -1
View File
@@ -5,9 +5,12 @@ __pycache__/
# 忽略数据文件 # 忽略数据文件
data/excel/* data/excel/*
rag2_0/demo/Test.py rag2_0/demo/Test*
rag2_0/demo/test*
data/excel/*.xlsx data/excel/*.xlsx
rag2_0/demo/ProfessionalTermAnalyzer.py rag2_0/demo/ProfessionalTermAnalyzer.py
data/logs/* data/logs/*
rag2_0/dify/Test.py rag2_0/dify/Test.py
data/query_logs/* data/query_logs/*
data/conversations/*
data/test*
+140 -159
View File
@@ -34,30 +34,33 @@ logging.basicConfig(
logger = logging.getLogger("dialogue_to_workorder") logger = logging.getLogger("dialogue_to_workorder")
human_info={ human_info={
"1116":["夏剑媛", "储能"], "1116":["夏剑媛", "新能源"],
"1201":["曹美芳", "配网"], "1201":["曹美芳", "配网造价及清单"],
"1202":["彭珊珊", "主网"], "1202":["彭珊珊", "主网造价及清单"],
"1230":["龚青", "配网"], "1230":["龚青", "配网造价及清单"],
"1544":["黄婷", "主网"], "1544":["黄婷", "主网造价及清单"],
"1546":["严琼辉", "配网"], "1546":["严琼辉", "配网造价及清单"],
"1552":["吴园妹", "主网"], "1552":["吴园妹", "主网造价及清单"],
"1555":["魏怡璠", "配网"], "1555":["魏怡璠", "配网造价及清单"],
"1789":["冷琛", "主网"], "1789":["冷琛", "主网造价及清单"],
"2142":["余国庆", "配网"], "2142":["余国庆", "配网造价及清单"],
"2144":["卢光辉", "技改"], "2144":["卢光辉", "技改检修"],
"2145":["万志星", "技改"], "2145":["万志星", "技改检修"],
"2233":["徐雨萍", "主网"], "2233":["徐雨萍", "主网造价及清单"],
"2262":["刘雨微", "主网"], "2262":["刘雨微", "主网造价及清单"],
"2591":["揭敏", "主网"], "2591":["揭敏", "主网造价及清单"],
"3035":["杨玲", "主网"], "3035":["杨玲", "主网造价及清单"],
"3416":["杨苏文", "配网"], "3416":["杨苏文", "配网造价及清单"],
"3417":["王琴", "配网"], "3417":["王琴", "配网造价及清单"],
"439":["赵莉", "技改"], "439":["赵莉", "技改检修"],
"8340":["熊磊娇", "储能"], "8340":["熊磊娇", "新能源"],
"8442":["胡月", "配网"], "8442":["胡月", "配网造价及清单"],
"8443":["杨淑玲", "主网"], "8443":["杨淑玲", "主网造价及清单"],
"8555":["胡青艳", "主网"], "8555":["胡青艳", "主网造价及清单"],
"8762":["周丽华", "主网"], "8762":["周丽华", "主网造价及清单"],
"1553":["郝中华", "技改检修"],
"8817":["赵雅馨", "技改检修"],
"2590":["李琴", "技改检修"],
} }
# ================ 模型定义 ================ # ================ 模型定义 ================
@@ -65,6 +68,10 @@ class UserQuestionAndSolution(BaseModel):
user_question: str = Field(description="用户的核心问题") user_question: str = Field(description="用户的核心问题")
solution: str = Field(description="坐席提供的解决方案,解决方案如果存在多个步骤,使用中文分号隔开") solution: str = Field(description="坐席提供的解决方案,解决方案如果存在多个步骤,使用中文分号隔开")
class ProductInfo:
product_line:str
product_name:str
class UserQuestionAndSolutionList(BaseModel): class UserQuestionAndSolutionList(BaseModel):
user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表") user_question_list: list[UserQuestionAndSolution] = Field(description="客户问题列表")
@@ -133,48 +140,112 @@ class DialogueToWorkorder:
self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint) self.is_complaint_parser = PydanticOutputParser(pydantic_object=IsComplaint)
self.product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName) self.product_name_and_module_name_parser = PydanticOutputParser(pydantic_object=ProductNameAndModuleName)
self.product_line_parser = PydanticOutputParser(pydantic_object=ProductLine) self.product_line_parser = PydanticOutputParser(pydantic_object=ProductLine)
# 初始化LLM模型 # 初始化LLM模型
# self.llm_params = llm_params or {
# "temperature": 0.2,
# "top_p":0.95,
# "model": "deepseek-ai/DeepSeek-R1",
# "api_key": os.getenv("OPENAI_API_KEY"),
# "base_url": os.getenv("OPENAI_API_BASE"),
# "timeout": httpx.Timeout(600.0)
# }
self.api_key = "25t%Syu6I9yxX2IuTN"
self.llm_params = llm_params or { self.llm_params = llm_params or {
"temperature": 0.2, "temperature": 0.2,
"top_p":0.95, "top_p":0.95,
"model": "deepseek-r1", "model": "deepseek-ai/DeepSeek-R1",
"api_key": "25t%Syu6I9yxX2IuTN", "api_key": os.getenv("OPENAI_API_KEY"),
"base_url": "http://10.1.0.154:8000/v1", "base_url": os.getenv("OPENAI_API_BASE"),
"timeout": httpx.Timeout(600.0) "timeout": httpx.Timeout(600.0)
} }
# self.llm_params = llm_params or {
# "temperature": 0.2,
# "top_p":0.95,
# "model": "deepseek-r1",
# "api_key": "25t%Syu6I9yxX2IuTN",
# "base_url": "http://10.1.0.154:8000/v1",
# "timeout": httpx.Timeout(600.0)
# }
self.llm = self._get_llm_instance() self.llm = self._get_llm_instance()
def _get_llm_instance(self): def _get_llm_instance(self):
"""获取LLM实例""" """获取LLM实例"""
return OpenAiLLM(**self.llm_params) return OpenAiLLM(**self.llm_params)
def parse_product_detail_excel(self, file_path): def get_product_info(self, conversation_context:str, skill_group:str)->ProductInfo:
"""解析产品详情Excel文件""" product_info = ProductInfo()
df = pd.read_excel(file_path)
product_dict = {}
for _, row in df.iterrows(): # 默认为其他
product_line = str(row['产品线']).strip() if pd.notna(row['产品线']) else '' product_info.product_line = "其他"
product_name = str(row['产品名称']).strip() if pd.notna(row['产品名称']) else '' product_info.product_name = ""
module_name = str(row['模块名称']).strip() if pd.notna(row['模块名称']) else ''
if product_line not in product_dict: # 主网造价及清单技能组
product_dict[product_line] = {} if skill_group == "主网造价及清单":
if product_name not in product_dict[product_line]: if "西藏" in conversation_context:
product_dict[product_line][product_name] = [] product_info.product_line = "博微西藏计价通Z1"
product_dict[product_line][product_name].append(module_name) if "建安预算" in conversation_context or "全口径预算" in conversation_context:
product_info.product_name = "施工图预算"
elif "招标" in conversation_context or "投标" in conversation_context:
product_info.product_name = "招投标"
elif "清单" in conversation_context and "结算" in conversation_context:
product_info.product_name = "清单结算"
else:
product_info.product_name = "概预算"
elif "造价2016" in conversation_context or "造价2014" in conversation_context or "造价2008" in conversation_context:
product_info.product_line = "主网造价系列"
elif "清单2016" in conversation_context or "清单2015" in conversation_context:
product_info.product_line = "主网清单系列"
elif "大结算" in conversation_context or "结算2018" in conversation_context:
product_info.product_line = "管理或辅助工具软件"
product_info.product_name = "结算应用2018"
elif "清标" in conversation_context or "数字化" in conversation_context:
product_info.product_line = "管理或辅助工具软件"
product_info.product_name = "数字化清标工具"
else:
product_info.product_line = "博微电力建设计价通软件"
if "建安预算" in conversation_context or "全口径预算" in conversation_context:
product_info.product_name = "施工图预算"
elif "招标" in conversation_context or "投标" in conversation_context:
product_info.product_name = "招投标"
elif any(keyword in conversation_context for keyword in ["清单结算", "基准价", "结算条款", "结算市场价", "结算审核"]):
product_info.product_name = "清单结算"
else:
product_info.product_name = "概预算"
return product_dict # 技改检修技能组
elif skill_group == "技改检修":
if "技改2015" in conversation_context or "技改2016" in conversation_context:
product_info.product_line = "技改检修系列"
else:
product_info.product_line = "博微技改检修计价通T1软件"
if "T1" in conversation_context and "清单" in conversation_context:
product_info.product_name = "清单"
else:
product_info.product_name = "概预算"
# 配网造价及清单技能组
elif skill_group == "配网造价及清单":
if "配网2017" in conversation_context or "09配网" in conversation_context or "老配网" in conversation_context:
product_info.product_line = "配网系列"
else:
product_info.product_line = "博微配网计价通D3"
if "南网" in conversation_context:
product_info.product_name = "南网版"
elif "清单" in conversation_context and "南网" not in conversation_context:
product_info.product_name = "行业清单计价"
else:
product_info.product_name = "概预算"
# 新能源技能组
elif skill_group == "新能源":
product_info.product_line = "新能源系列"
if "核电" in conversation_context:
product_info.product_name = "核电清单2018"
elif "光伏" in conversation_context and "2018" in conversation_context:
product_info.product_name = "光伏计价2018"
elif "储能" in conversation_context:
product_info.product_name = "新型储能电站计价通C1"
elif "营销" in conversation_context:
product_info.product_name = "陕西电力营销计价通M1"
elif "新能源" in conversation_context or "光伏清单" in conversation_context or "风电" in conversation_context:
product_info.product_name = "新能源计价通N1"
# 经济评价及其他技能组
elif "经济评价" in conversation_context:
product_info.product_line = "经济评价系列"
return product_info
def get_workorder_dict(self, rows): def get_workorder_dict(self, rows):
"""从会话行中提取工单基本信息""" """从会话行中提取工单基本信息"""
@@ -221,6 +292,13 @@ class DialogueToWorkorder:
dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S") dt = datetime.strptime(create_time_str, "%Y-%m-%d %H:%M:%S")
workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S") workorder_dict["工单编号"] = dt.strftime("%Y%m%d%H%M%S")
# 获取产品线和产品名称
dialogue_str = self.get_dialogue_str(rows)
skill_group = workorder_dict.get("处理技能组", "")
product_info = self.get_product_info(dialogue_str, skill_group)
workorder_dict["产品线"] = product_info.product_line
workorder_dict["产品名称"] = product_info.product_name
return workorder_dict return workorder_dict
def get_dialogue_str(self, conversation_rows): def get_dialogue_str(self, conversation_rows):
@@ -291,7 +369,7 @@ class DialogueToWorkorder:
output_format = self.user_question_and_solution_parser.get_format_instructions() output_format = self.user_question_and_solution_parser.get_format_instructions()
llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str) llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str)
response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False, api_key=self.api_key) response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False)
clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL) clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL)
try: try:
if clean_output.count('user_question') == 1: if clean_output.count('user_question') == 1:
@@ -321,75 +399,13 @@ class DialogueToWorkorder:
except Exception as e: except Exception as e:
output_format = self.user_question_and_solution_list_parser.get_format_instructions() output_format = self.user_question_and_solution_list_parser.get_format_instructions()
llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str) llm_prompt = prompt.format(output_format=output_format, dialogue_str=dialogue_str)
response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False, api_key=self.api_key) response = self.llm.invoke(user_prompt=llm_prompt, need_retry=False)
clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL) clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL)
user_question_and_solution_temp = self.user_question_and_solution_list_parser.parse(clean_output) user_question_and_solution_temp = self.user_question_and_solution_list_parser.parse(clean_output)
return user_question_and_solution_temp.user_question_list return user_question_and_solution_temp.user_question_list
return [user_question_and_solution] return [user_question_and_solution]
@retry_llm_call(max_retries=3, delay=2)
def get_product_name_and_module_name(self, product_line, conversation_rows, product_detail_dict, user_question_str, solution_str):
"""分析产品名称和模块名称"""
if product_line == '':
return '', ''
json_str = json.dumps(product_detail_dict[product_line])
dialogue_str = self.get_dialogue_str(conversation_rows)
prompt = f"""
请根据以下对话内容分析所属产品名称和模块名称,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
要求:
1. 如果对话记录中存在多个产品名称和模块名称,则根据"{user_question_str}""{solution_str}"判断最可能的产品名称和模块名称。
2. 如果对话记录中只存在一个产品名称和模块名称,则直接返回该产品名称和模块名称。
输出格式:
{self.product_name_and_module_name_parser.get_format_instructions()}
产品名称列表及模块名称列表:
{json_str}
对话记录:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False, api_key=self.api_key)
clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL)
product_name_and_module_name = self.product_name_and_module_name_parser.parse(clean_output)
return product_name_and_module_name.product_name, product_name_and_module_name.module_name
@retry_llm_call(max_retries=3, delay=2)
def get_product_line(self, conversation_rows, product_detail_dict, user_question_str, solution_str):
"""分析产品线"""
dialogue_str = self.get_dialogue_str(conversation_rows)
product_line_list = list(product_detail_dict.keys())
prompt = f"""
请根据以下对话内容分析所属产品线,按优先级找出最相关的1-3个分类标签及置信度(0-1),最后返回置信度最高的分类标签。
无法判断时,返回空字符串。即product_line=""
要求:
1. 如果对话记录中存在多个产品线,则根据"{user_question_str}""{solution_str}"判断最可能的产品线。
2. 如果对话记录中只存在一个产品线,则直接返回该产品线。
输出格式:
{self.product_line_parser.get_format_instructions()}
产品线列表:
{product_line_list}
对话记录:
{dialogue_str}
"""
response = self.llm.invoke(user_prompt=prompt, need_retry=False, api_key=self.api_key)
clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL)
product_line = self.product_line_parser.parse(clean_output)
return product_line.product_line
@retry_llm_call(max_retries=3, delay=2) @retry_llm_call(max_retries=3, delay=2)
def get_problem_type(self, conversation_rows, user_question_str, solution_str): def get_problem_type(self, conversation_rows, user_question_str, solution_str):
"""分析问题类型""" """分析问题类型"""
@@ -421,7 +437,7 @@ class DialogueToWorkorder:
{dialogue_str} {dialogue_str}
""" """
response = self.llm.invoke(user_prompt=prompt, need_retry=False, api_key=self.api_key) response = self.llm.invoke(user_prompt=prompt, need_retry=False)
clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL) clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL)
question_type = self.question_type_parser.parse(clean_output) question_type = self.question_type_parser.parse(clean_output)
@@ -458,7 +474,7 @@ class DialogueToWorkorder:
""" """
response = self.llm.invoke(user_prompt=prompt, need_retry=False, api_key=self.api_key) response = self.llm.invoke(user_prompt=prompt, need_retry=False)
clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL) clean_output = re.sub(r'<think>.*?</think>', '', response.content, flags=re.DOTALL)
is_complaint = self.is_complaint_parser.parse(clean_output) is_complaint = self.is_complaint_parser.parse(clean_output)
@@ -467,7 +483,7 @@ class DialogueToWorkorder:
is_complaint.dissatisfaction_reasoning, is_complaint.dissatisfaction_reasoning,
is_complaint.is_complaint) is_complaint.is_complaint)
def process_conversation(self, conversation_id, conversation_rows, product_detail_dict): def process_conversation(self, conversation_id, conversation_rows):
"""处理单个会话的函数,用于多线程并发""" """处理单个会话的函数,用于多线程并发"""
# if conversation_id!="b157aa91-3acb-11f0-a191-4fb224ef4b40": # if conversation_id!="b157aa91-3acb-11f0-a191-4fb224ef4b40":
# return [] # return []
@@ -494,34 +510,9 @@ class DialogueToWorkorder:
# 分析问题类型 # 分析问题类型
problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str) problem_type = self.get_problem_type(conversation_rows, user_question_str, solution_str)
# 分析产品线
product_line = self.get_product_line(conversation_rows, product_detail_dict, user_question_str, solution_str)
# 分析产品名称和模块名称
if product_line != '':
product_name, module_name = self.get_product_name_and_module_name(
product_line, conversation_rows, product_detail_dict, user_question_str, solution_str)
else:
product_name = ''
module_name = ''
# 创建工单列表 # 创建工单列表
workorder_list = [] workorder_list = []
# 更新工单字典
# base_workorder_dict.update({
# "产品线": product_line,
# "产品名称": product_name,
# "模块名称": module_name,
# "客户问题": user_question_str,
# "问题类型": problem_type,
# "是否抱怨": "是" if is_dissatisfaction else '否',
# "抱怨内容": dissatisfaction_reasoning if is_dissatisfaction else '',
# "抱怨级别": dissatisfaction_level if is_dissatisfaction else '',
# "是否投诉": "是" if is_complaint else '否',
# "解决方案": solution_str
# })
# workorder_list.append(base_workorder_dict)
for user_question in user_question_list: for user_question in user_question_list:
user_question_str = user_question.user_question user_question_str = user_question.user_question
solution_str = user_question.solution solution_str = user_question.solution
@@ -531,9 +522,6 @@ class DialogueToWorkorder:
# 更新工单字典 # 更新工单字典
workorder_dict.update({ workorder_dict.update({
"产品线": product_line,
"产品名称": product_name,
"模块名称": module_name,
"客户问题": user_question_str, "客户问题": user_question_str,
"问题类型": problem_type, "问题类型": problem_type,
"是否抱怨": "" if is_dissatisfaction else '', "是否抱怨": "" if is_dissatisfaction else '',
@@ -551,7 +539,7 @@ class DialogueToWorkorder:
logger.error(f"处理会话ID: {conversation_id} 时发生错误: {e}") logger.error(f"处理会话ID: {conversation_id} 时发生错误: {e}")
return [] return []
def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=10, start_date=None, end_date=None): def analyze_conversation_data(self, conversation_excel_path, max_workers=10, start_date=None, end_date=None):
"""分析会话数据主流程,使用多线程并发处理""" """分析会话数据主流程,使用多线程并发处理"""
# 读取Excel文件 # 读取Excel文件
df = pd.read_excel(conversation_excel_path) df = pd.read_excel(conversation_excel_path)
@@ -560,9 +548,6 @@ class DialogueToWorkorder:
logger.info(f"Excel文件列名: {df.columns.tolist()}") logger.info(f"Excel文件列名: {df.columns.tolist()}")
logger.info(f"数据总行数: {len(df)}") logger.info(f"数据总行数: {len(df)}")
# 解析产品详情
product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path)
# 按会话ID分组 # 按会话ID分组
conversation_dict = self.group_conversations_by_id(df) conversation_dict = self.group_conversations_by_id(df)
@@ -596,7 +581,7 @@ class DialogueToWorkorder:
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建任务 # 创建任务
future_to_conversation = { future_to_conversation = {
executor.submit(self.process_conversation, conversation_id, conversation_rows, product_detail_dict): conversation_id executor.submit(self.process_conversation, conversation_id, conversation_rows): conversation_id
for conversation_id, conversation_rows in conversation_dict.items() for conversation_id, conversation_rows in conversation_dict.items()
} }
@@ -694,13 +679,11 @@ def parse_arguments():
parser.add_argument('--conversation_file', type=str, required=False, parser.add_argument('--conversation_file', type=str, required=False,
help='会话内容Excel文件路径') help='会话内容Excel文件路径')
parser.add_argument('--product_detail_file', type=str, required=False, parser.add_argument('--max_workers', type=int, default=40,
help='产品详情Excel文件路径')
parser.add_argument('--max_workers', type=int, default=6,
help='并发处理线程数,默认为16') help='并发处理线程数,默认为16')
parser.add_argument('--start_date', type=str, required=False,default="2025-06-10 16:08:00", parser.add_argument('--start_date', type=str, required=False,default=None,
help='开始日期,格式为YYYY-MM-DD') help='开始日期,格式为YYYY-MM-DD')
parser.add_argument('--end_date', type=str, required=False,default="2025-06-30 23:59:59", parser.add_argument('--end_date', type=str, required=False,default=None,
help='结束日期,格式为YYYY-MM-DD') help='结束日期,格式为YYYY-MM-DD')
return parser.parse_args() return parser.parse_args()
@@ -712,8 +695,7 @@ def main():
args = parse_arguments() args = parse_arguments()
# 设置默认文件路径 # 设置默认文件路径
conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '2025年1月到6月所有对话记录.xlsx') conversation_excel_path = args.conversation_file or os.path.join('data', 'excel', '客服对话记录7.1-7.22.xlsx')
product_detail_excel_path = args.product_detail_file or os.path.join('data', 'excel', '产品详情_工单.xlsx')
# 创建处理实例 # 创建处理实例
processor = DialogueToWorkorder() processor = DialogueToWorkorder()
@@ -721,7 +703,6 @@ def main():
# 分析会话数据 # 分析会话数据
workorder_dict_list = processor.analyze_conversation_data( workorder_dict_list = processor.analyze_conversation_data(
conversation_excel_path, conversation_excel_path,
product_detail_excel_path,
max_workers=args.max_workers, max_workers=args.max_workers,
start_date=args.start_date, start_date=args.start_date,
end_date=args.end_date end_date=args.end_date