更新pyproject.toml和uv.lock文件,新增ijson和langfuse依赖,同时在对话到工单的分析流程中添加时间范围过滤功能,优化日志记录,支持按时间范围过滤会话数据。新增获取工作流运行信息的方法,并更新意图识别API以支持使用jieba分词。
This commit is contained in:
@@ -499,7 +499,7 @@ class DialogueToWorkorder:
|
||||
|
||||
return workorder_list
|
||||
|
||||
def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=10):
|
||||
def analyze_conversation_data(self, conversation_excel_path, product_detail_excel_path, max_workers=10, start_date=None, end_date=None):
|
||||
"""分析会话数据主流程,使用多线程并发处理"""
|
||||
# 读取Excel文件
|
||||
df = pd.read_excel(conversation_excel_path)
|
||||
@@ -511,21 +511,29 @@ class DialogueToWorkorder:
|
||||
# 解析产品详情
|
||||
product_detail_dict = self.parse_product_detail_excel(product_detail_excel_path)
|
||||
|
||||
# 如果指定了时间范围,则过滤数据
|
||||
if start_date or end_date:
|
||||
# 确保创建时间列为日期时间类型
|
||||
if '创建时间' in df.columns:
|
||||
df['创建时间'] = pd.to_datetime(df['创建时间'], errors='coerce')
|
||||
|
||||
# 按时间范围过滤
|
||||
if start_date:
|
||||
start_date = pd.to_datetime(start_date)
|
||||
df = df[df['创建时间'] >= start_date]
|
||||
logger.info(f"过滤开始时间 {start_date},剩余数据行数: {len(df)}")
|
||||
|
||||
if end_date:
|
||||
end_date = pd.to_datetime(end_date)
|
||||
df = df[df['创建时间'] <= end_date]
|
||||
logger.info(f"过滤结束时间 {end_date},剩余数据行数: {len(df)}")
|
||||
else:
|
||||
logger.warning("数据中没有'创建时间'列,无法按时间范围过滤")
|
||||
|
||||
# 按会话ID分组
|
||||
conversation_dict = self.group_conversations_by_id(df)
|
||||
# 限制处理的会话数量为前2000个
|
||||
if len(conversation_dict) > 2000:
|
||||
logger.info(f"会话总数为 {len(conversation_dict)},限制处理前2000个会话")
|
||||
# 获取所有会话ID
|
||||
conversation_ids = list(conversation_dict.keys())
|
||||
# 只保留前2000个会话
|
||||
limited_conversation_dict = {
|
||||
conversation_id: conversation_dict[conversation_id]
|
||||
for conversation_id in conversation_ids[:2000]
|
||||
}
|
||||
conversation_dict = limited_conversation_dict
|
||||
else:
|
||||
logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话")
|
||||
logger.info(f"会话总数为 {len(conversation_dict)},处理全部会话")
|
||||
|
||||
# 使用线程池处理每个会话
|
||||
workorder_dict_list = []
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
@@ -629,6 +637,10 @@ def parse_arguments():
|
||||
help='产品详情Excel文件路径')
|
||||
parser.add_argument('--max_workers', type=int, default=16,
|
||||
help='并发处理线程数,默认为16')
|
||||
parser.add_argument('--start_date', type=str, required=False,
|
||||
help='开始日期,格式为YYYY-MM-DD')
|
||||
parser.add_argument('--end_date', type=str, required=False,
|
||||
help='结束日期,格式为YYYY-MM-DD')
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
@@ -649,9 +661,21 @@ def main():
|
||||
workorder_dict_list = processor.analyze_conversation_data(
|
||||
conversation_excel_path,
|
||||
product_detail_excel_path,
|
||||
max_workers=args.max_workers
|
||||
max_workers=args.max_workers,
|
||||
start_date=args.start_date,
|
||||
end_date=args.end_date
|
||||
)
|
||||
output_file = conversation_excel_path.replace('.xlsx', '_转工单.xlsx')
|
||||
|
||||
# 生成输出文件名
|
||||
if args.start_date and args.end_date:
|
||||
output_file = conversation_excel_path.replace('.xlsx', f'_{args.start_date}至{args.end_date}_转工单.xlsx')
|
||||
elif args.start_date:
|
||||
output_file = conversation_excel_path.replace('.xlsx', f'_从{args.start_date}起_转工单.xlsx')
|
||||
elif args.end_date:
|
||||
output_file = conversation_excel_path.replace('.xlsx', f'_至{args.end_date}_转工单.xlsx')
|
||||
else:
|
||||
output_file = conversation_excel_path.replace('.xlsx', '_转工单.xlsx')
|
||||
|
||||
# 保存结果
|
||||
processor.save_results_to_excel(workorder_dict_list, output_file)
|
||||
|
||||
|
||||
@@ -84,17 +84,18 @@ async def health_check():
|
||||
return {"status": "ok"}
|
||||
|
||||
@app.get("/query_type", summary="异步检索API")
|
||||
async def query_type(query: str, query_type: str):
|
||||
async def query_type(query: str, query_type: str, workflow_run_id:str):
|
||||
try:
|
||||
# 记录请求
|
||||
logger.info(f"接收到请求: {query}, 类型: {query_type}")
|
||||
logger.info(f"接收到请求: {query}, 类型: {query_type}, workflow_run_id: {workflow_run_id}")
|
||||
|
||||
# 保存 提问、问题类型、当前时间戳到json
|
||||
timestamp = datetime.datetime.now().isoformat()
|
||||
query_data = {
|
||||
"query": query,
|
||||
"query_type": query_type,
|
||||
"timestamp": timestamp
|
||||
"timestamp": timestamp,
|
||||
"workflow_run_id": workflow_run_id
|
||||
}
|
||||
success = True
|
||||
try:
|
||||
|
||||
@@ -197,6 +197,8 @@ class DifyQueryRetrieval:
|
||||
# 将去重后的文档转换为列表
|
||||
deduplicated_documents = list(unique_documents.values())
|
||||
|
||||
if len(deduplicated_documents) == 0:
|
||||
return []
|
||||
# 对所有检索出来的文档进行重排序
|
||||
time_start = time.time()
|
||||
processed_documents = await self.data_post_processor_async(original_query, deduplicated_documents, top_k)
|
||||
|
||||
@@ -244,6 +244,29 @@ class PgSql:
|
||||
raise Exception(f"Error while getting conversation_messages: {error}")
|
||||
return rating
|
||||
|
||||
def get_workflow_run_info(self, workflow_run_id):
|
||||
"""
|
||||
通过msg_id从message_feedbacks中找到对应的rating。
|
||||
:param msg_id: 消息ID (UUID格式)
|
||||
:return: rating 字符串
|
||||
"""
|
||||
with self.pg_sql_lock:
|
||||
rating = None
|
||||
try:
|
||||
with self.connection.cursor() as cursor:
|
||||
# 构建查询语句
|
||||
cursor.execute("""
|
||||
SELECT * FROM workflow_runs WHERE id=%s;
|
||||
""",
|
||||
(workflow_run_id,))
|
||||
# 执行查询
|
||||
result = cursor.fetchone()
|
||||
if result:
|
||||
colnames = [desc[0] for desc in cursor.description]
|
||||
return dict(zip(colnames, result))
|
||||
except (Exception, psycopg2.Error) as error:
|
||||
raise Exception(f"Error while getting conversation_messages: {error}")
|
||||
return None
|
||||
|
||||
class DifyTool:
|
||||
"""
|
||||
@@ -337,6 +360,9 @@ class DifyTool:
|
||||
def get_message_rating(self, msg_id):
|
||||
return self.dify_pgsql.get_message_rating(msg_id)
|
||||
|
||||
def get_workflow_run_info(self, workflow_run_id):
|
||||
return self.dify_pgsql.get_workflow_run_info(workflow_run_id)
|
||||
|
||||
class BaseWorkflowChat:
|
||||
"""
|
||||
工作流对话基类,封装了与Dify API交互的基本功能
|
||||
|
||||
@@ -109,7 +109,7 @@ async def intent_recognize(request: IntentRecognizeRequest):
|
||||
conversation_context=request.conversation_context,
|
||||
chat_history=request.chat_history,
|
||||
previous_slots=request.previous_slots,
|
||||
use_jieba=False,
|
||||
use_jieba=True,
|
||||
enable_query_expansion=True
|
||||
)
|
||||
|
||||
@@ -175,7 +175,7 @@ if __name__ == "__main__":
|
||||
"rag2_0.dify.intent_recognition_api:app",
|
||||
host="0.0.0.0",
|
||||
port=8001,
|
||||
reload=False, # 开发环境启用热重载
|
||||
reload=True, # 开发环境启用热重载
|
||||
workers=1 # 生产环境可以增加worker数量
|
||||
)
|
||||
# 生产环境可以使用以下命令启动:
|
||||
|
||||
@@ -224,15 +224,13 @@ class DataProblemSlots(SlotBase):
|
||||
# 3.1 后缀名咨询
|
||||
class FileExtensionConsultingSlots(SlotBase):
|
||||
file_extension: str = Field(default="", description="文件后缀名")
|
||||
operation_purpose: str = Field(default="", description="操作目的(了解对应软件,对应工程)")
|
||||
operation_purpose: Optional[str] = Field(default="", description="操作目的(了解对应软件,对应工程)")
|
||||
|
||||
def check_required_slots(self) -> Tuple[bool, Dict[str, str]]:
|
||||
"""检查必填槽位是否都存在"""
|
||||
missing_slots = {}
|
||||
if not self.file_extension:
|
||||
missing_slots["file_extension"] = FileExtensionConsultingSlots.model_fields["file_extension"].description
|
||||
if not self.operation_purpose:
|
||||
missing_slots["operation_purpose"] = FileExtensionConsultingSlots.model_fields["operation_purpose"].description
|
||||
return len(missing_slots) == 0, missing_slots
|
||||
|
||||
# 3.2 软件锁类
|
||||
@@ -311,7 +309,8 @@ class IntentAndSlotResult(BaseModel):
|
||||
class StepBackPrompt(BaseModel):
|
||||
"""后退提示数据模型"""
|
||||
original_query: str = Field(description="原始查询")
|
||||
step_back_query: str = Field(description="后退提示生成的抽象查询")
|
||||
can_use_back_prompt: bool = Field(description="原始查询是否可以进行后退提示(True/False),如果原始查询没有限定词或其他限定词语,则不能进行后退提示")
|
||||
step_back_query: List[str] = Field(description="后退提示生成的抽象查询(多个)")
|
||||
|
||||
class FollowUpQuestions(BaseModel):
|
||||
"""后续问题数据模型"""
|
||||
|
||||
@@ -211,13 +211,12 @@ step_back_prompt = """
|
||||
## 任务说明
|
||||
1. 分析用户的原始问题,理解其核心意图和需求
|
||||
2. 考虑历史对话和会话背景,理解用户当前问题的上下文
|
||||
3. 生成一个更抽象、更高层次的问题,称为"后退问题"
|
||||
3. 生成更抽象、更高层次的问题,称为"后退问题",后退问题可以生成多个,依次后退到更抽象、更高层次的问题
|
||||
4. 后退问题应该:
|
||||
- 更加通用和抽象
|
||||
- 更加通用和抽象,不应包含原始问题的具体细节(包括场景限定、界面限定等其他限定词语)
|
||||
- 涵盖原始问题的核心主题
|
||||
- 去除过于具体的限制条件(如时间、地点、特定版本等)
|
||||
- 去除过于具体的限制条件(如时间、地点、特定版本、特定工程等)
|
||||
- 保持在同一领域和主题范围内
|
||||
- 考虑历史对话中的相关信息
|
||||
|
||||
## 输入
|
||||
用户原始问题: {query}
|
||||
@@ -229,10 +228,10 @@ step_back_prompt = """
|
||||
|
||||
## 示例
|
||||
原始问题: "配网D3软件2023版本如何在Windows 11系统上导入单位工程量清单?"
|
||||
后退问题: "配网D3软件如何导入工程量清单?"
|
||||
后退问题: ["配网D3软件如何导入工程量清单?", "如何导入单位工程量清单?"]
|
||||
|
||||
原始问题: "技改T1软件中的某个设备更换后,如何在系统中更新对应的定额?"
|
||||
后退问题: "技改T1软件中如何更新设备对应的定额?"
|
||||
后退问题: ["技改T1软件中如何更新设备对应的定额?", "如何更新设备对应的定额?"]
|
||||
"""
|
||||
|
||||
follow_up_questions_prompt = """
|
||||
|
||||
Reference in New Issue
Block a user