Compare commits

..

9 Commits

Author SHA1 Message Date
ouyangyouzhang 9200df7842 新增客服重定向接口 2025-11-28 10:12:41 +08:00
ouyangyouzhang eb361fe77f feat: 添加启动意图识别API服务的脚本
添加专用脚本用于启动rag2_0.api.intent_recognition_api服务
脚本功能包括检测并结束现有screen会话,清理占用端口,最后启动新服务
2025-11-26 11:12:39 +08:00
ouyangyouzhang 4627a2268f Merge branch 'master' of https://git.97id.com/ouyangyouzhang/QueryRewrite 2025-11-26 10:52:28 +08:00
ouyangyouzhang 46f756428e 上线前相关环境变量的修改 2025-11-26 10:49:54 +08:00
ouyangyouzhang a22b001680 先跳过 定额信息的提取 2025-11-26 10:28:11 +08:00
ouyangyouzhang c97e96c620 删除欠费的api 2025-10-21 18:02:16 +08:00
ouyangyouzhang 94a8656c7f 微调提示词,基本不影响效果 2025-09-26 15:46:00 +08:00
ouyangyouzhang bc6c907872 删除提示词中多余空格、换行等。减少不必要的token 2025-09-26 11:31:18 +08:00
ouyangyouzhang 2b13fdab99 调整提示词、简化代码 2025-09-25 16:23:13 +08:00
25 changed files with 546 additions and 578 deletions
+18 -18
View File
@@ -1,27 +1,27 @@
OPENAI_API_BASE=https://api.siliconflow.cn/v1/
MODEL_NAME=deepseek-ai/DeepSeek-V3
RERANKER_BASE_URL=http://10.1.16.39:9995
RERANKER_MODEL_NAME=bge-reranker-v2-m3
RERANKER_API_KEY=test
# RERANKER_BASE_URL=http://10.1.16.39:9995
# RERANKER_MODEL_NAME=bge-reranker-v2-m3
# RERANKER_API_KEY=test
EMBEDDING_BASE_URL=http://10.1.16.39:9995
EMBEDDING_MODEL_NAME=bge-m3
EMBEDDING_API_KEY=test
# EMBEDDING_BASE_URL=http://10.1.16.39:9995
# EMBEDDING_MODEL_NAME=bge-m3
# EMBEDDING_API_KEY=test
DIFY_BSAE_URL=http://10.1.16.39/v1
DIFY_APP_KEY=app-CPoOMaGDsLRPAe9TW7Xjhszy
DIFY_DATASET_KEY=dataset-skLjmPVonjHo119OWNf3kAmY
# DIFY_BSAE_URL=http://10.1.16.39/v1
# DIFY_APP_KEY=app-CPoOMaGDsLRPAe9TW7Xjhszy
# DIFY_DATASET_KEY=dataset-skLjmPVonjHo119OWNf3kAmY
DIFY_PG_HOST = 10.1.16.39
DIFY_PG_PORT = 5432
DIFY_PG_USER = postgres
DIFY_PG_PASSWORD = difyai123456
DIFY_PG_DATABASE = dify
# DIFY_PG_HOST = 10.1.16.39
# DIFY_PG_PORT = 5432
# DIFY_PG_USER = postgres
# DIFY_PG_PASSWORD = difyai123456
# DIFY_PG_DATABASE = dify
ENABLE_LANGFUSE=true
LANGFUSE_PUBLIC_KEY=pk-lf-4e9b7cbe-528c-4697-b73c-33257a60072c
LANGFUSE_SECRET_KEY=sk-lf-cd8a78c5-2538-455e-a85a-87b6e1aa69d0
LANGFUSE_HOST=http://10.1.6.34:3000
# ENABLE_LANGFUSE=true
# LANGFUSE_PUBLIC_KEY=pk-lf-4e9b7cbe-528c-4697-b73c-33257a60072c
# LANGFUSE_SECRET_KEY=sk-lf-cd8a78c5-2538-455e-a85a-87b6e1aa69d0
# LANGFUSE_HOST=http://10.1.6.34:3000
+1 -3
View File
@@ -11,9 +11,7 @@ data/excel/*.xlsx
!data/excel/Excel版 清单定额库/
!data/excel/Excel版 清单定额库/**
data/logs/*
rag2_0/dify/Test.py
data/query_logs/*
data/conversations/*
data/test*
data/temp*
data/db/answer_logs.db
data/db/qingdan_ding_e_ku.db
+8 -2
View File
@@ -10,7 +10,10 @@
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
},
{
"name": "IntentRecognition",
@@ -18,7 +21,10 @@
"request": "launch",
"program": "${workspaceFolder}/rag2_0/demo/intent_recognition_example.py",
"console": "integratedTerminal",
"justMyCode": true
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
}
]
}
-1
View File
@@ -57,7 +57,6 @@ sk-benuasjbhbxvdmgxishibmtpfyieamlfclmdclfbqloqsmaf
sk-ufmqbuplpjvzzlzohvsxertwgnguhipsbajxnxecvvccozly
sk-rypfoscrczeelowmrsixiuyunyqmqvknaprsnzmdguwzrkzx
sk-lucemnosmcxuwedvzilpefuxjnyvaxldpbgaqwnwalxmntul
sk-niymkyuzpyovndvvqvpaniiqfgoofnxczhdmjjessiocbeul
sk-cxlvgeuxavxfcajprxietuqyqjngtbrwrmrmrioxmgtbkpci
sk-vjjsuzntqbhcmelfsuquqyoxjivxcfwyxnrhpwzobgxlpmrv
sk-hbgctnpvntsnelveaudpekyncfgstdfazezboxmcgjvudzyg
-1
View File
@@ -29,7 +29,6 @@ def main(query: str) -> dict:
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
# 定义数据库路径
-1
View File
@@ -18,7 +18,6 @@ import logging
load_dotenv()
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
# 确保日志目录存在
@@ -5,7 +5,6 @@ import pandas as pd
from openpyxl import load_workbook
import logging
import numpy as np
sys.path.append(os.getcwd())
from rag2_0.tool.ModelTool import XinferenceEmbeddings
from langchain_community.vectorstores import SQLiteVSS
+4 -1
View File
@@ -15,8 +15,8 @@ import logging
load_dotenv()
import sys
sys.path.append(os.getcwd())
from rag2_0.intent_recognition import AsyncIntentRecognizer
from rag2_0.api.kefu_redirect_url import router as kefu_router
# 确保日志目录存在
os.makedirs('data/logs', exist_ok=True)
@@ -85,6 +85,9 @@ app.add_middleware(
allow_headers=["*"],
)
# 注册外部路由
app.include_router(kefu_router)
# 全局变量存储AsyncIntentRecognizer实例
_instance = None
+92
View File
@@ -0,0 +1,92 @@
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
import os
import sqlite3
import threading
import time
from queue import Queue, Full
router = APIRouter()
# 以当前文件为基准的相对路径:../../data/db
PROJECT_ROOT = os.getcwd()
DB_DIR = os.path.join(PROJECT_ROOT, "data", "db")
DB_FILE = os.path.join(DB_DIR, "redirects.sqlite3")
TABLE_SQL = (
"CREATE TABLE IF NOT EXISTS redirects ("
" msg_id TEXT PRIMARY KEY,"
" url TEXT NOT NULL"
")"
)
def _ensure_db():
"""确保数据库与表存在。"""
os.makedirs(DB_DIR, exist_ok=True)
with sqlite3.connect(DB_FILE) as conn:
cur = conn.cursor()
cur.execute(TABLE_SQL)
conn.commit()
def save_redirect(msg_id: str, url: str) -> None:
"""将 msg_id 与 url 写入 SQLite,若已存在则忽略。
使用 INSERT OR IGNORE 结合 PRIMARY KEY(msg_id) 来避免重复写入。
"""
_ensure_db()
with sqlite3.connect(DB_FILE) as conn:
cur = conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO redirects (msg_id, url) VALUES (?, ?)",
(msg_id, url),
)
conn.commit()
# ========= 异步写库队列与后台线程 =========
_write_queue: "Queue[tuple[str, str]]" = Queue(maxsize=10000)
def _write_worker():
_ensure_db()
while True:
try:
msg_id, url = _write_queue.get()
try:
with sqlite3.connect(DB_FILE) as conn:
cur = conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO redirects (msg_id, url) VALUES (?, ?)",
(msg_id, url),
)
conn.commit()
except Exception:
# 失败忽略,避免阻断工作线程
pass
finally:
_write_queue.task_done()
except Exception:
# 防御性 sleep,避免异常导致CPU空转
time.sleep(0.1)
_worker_thread = threading.Thread(target=_write_worker, daemon=True)
_worker_thread.start()
@router.get("/kefu_login", summary="客服登录页重定向")
async def kefu_redirect(msg_id:str):
"""重定向到客服登录页。"""
target_url = "https://www.booway.com.cn/kefu/toLoginPage"
# 写入 SQLite:若 msg_id 已存在将不会重复写入
try:
if msg_id:
# 走异步队列
_write_queue.put_nowait((msg_id, target_url))
except Exception:
# 出于稳健性考虑,即使写库失败也不影响重定向
pass
return RedirectResponse(target_url, status_code=302)
-1
View File
@@ -10,7 +10,6 @@ import sys
import os
# 导入ExcelToSQLiteProcessor类
sys.path.append(os.getcwd())
from rag2_0.api.create_qingdan_dinge_database import ExcelToSQLiteProcessor, create_db
# 导入向量检索相关类
from rag2_0.tool.ModelTool import XinferenceEmbeddings
-10
View File
@@ -18,8 +18,6 @@ from tqdm import tqdm
import glob
import shutil
# 将项目根目录添加到Python路径
sys.path.append(os.getcwd())
from rag2_0.tool.ModelTool import OpenAiLLM
load_dotenv()
@@ -142,14 +140,6 @@ class DialogueToWorkorder:
"base_url": os.getenv("OPENAI_API_BASE"),
"timeout": httpx.Timeout(600.0)
}
# self.llm_params = llm_params or {
# "temperature": 0.2,
# "top_p":0.95,
# "model": "deepseek-r1",
# "api_key": "25t%Syu6I9yxX2IuTN",
# "base_url": "http://10.1.0.154:8000/v1",
# "timeout": httpx.Timeout(600.0)
# }
self.llm = self._get_llm_instance()
# 创建工单JSON文件目录
+155 -273
View File
@@ -5,23 +5,18 @@ from __future__ import annotations
import json
import os
import re
import configparser
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
from contextlib import contextmanager
import threading
import time
from queue import Queue, Empty, Full
import pandas as pd
import pymysql
from pymysql.connections import Connection
from pymysql.cursors import Cursor
from tqdm import tqdm
import concurrent.futures
import sys
from queue import Queue, Empty, Full
os.makedirs('./data/logs', exist_ok=True)
# 配置日志
@@ -35,6 +30,18 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
# =====================
# 硬编码数据库配置(简化)
# =====================
DB_HOST = '192.168.0.123'
DB_PORT = 3307
DB_USER = 'fuzhimei'
DB_PASSWORD = 'fuzhimei@135'
DB_CHARSET = 'utf8mb4'
DB_CONNECT_TIMEOUT = 10
DB_READ_TIMEOUT = 300
DB_WRITE_TIMEOUT = 300
def parse_session_tags(input_string):
"""
解析sessionTag格式的字符串,支持任意数量的sessionTag
@@ -76,171 +83,6 @@ def parse_session_tags(input_string):
return result
@dataclass
class DatabaseConfig:
"""数据库配置类"""
host: str = '192.168.0.123'
port: int = 3307
user: str = 'fuzhimei'
password: str = 'fuzhimei@135'
charset: str = 'utf8mb4'
connect_timeout: int = 10
read_timeout: int = 300
write_timeout: int = 300
@classmethod
def from_config_file(cls, config_file: str = 'config.ini') -> 'DatabaseConfig':
"""从配置文件加载配置"""
if not os.path.exists(config_file):
logger.warning(f"配置文件 {config_file} 不存在,使用默认配置")
return cls()
config = configparser.ConfigParser()
config.read(config_file, encoding='utf-8')
if 'database' not in config:
logger.warning("配置文件中没有 [database] 部分,使用默认配置")
return cls()
db_config = config['database']
return cls(
host=db_config.get('host', cls.host),
port=int(db_config.get('port', cls.port)),
user=db_config.get('user', cls.user),
password=db_config.get('password', cls.password),
charset=db_config.get('charset', cls.charset),
connect_timeout=int(db_config.get('connect_timeout', cls.connect_timeout)),
read_timeout=int(db_config.get('read_timeout', cls.read_timeout)),
write_timeout=int(db_config.get('write_timeout', cls.write_timeout))
)
class ConnectionPool:
"""数据库连接池"""
def __init__(self, config: DatabaseConfig, max_connections: int = 10):
self.config = config
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self.active_connections = 0
self.lock = threading.Lock()
# 预创建一些连接
self._initialize_pool()
def _initialize_pool(self) -> None:
"""初始化连接池,预创建一些连接"""
initial_connections = min(3, self.max_connections)
for _ in range(initial_connections):
try:
conn = self._create_connection()
if conn:
self.pool.put_nowait(conn)
self.active_connections += 1
except Full:
break
except Exception as e:
logger.error(f"初始化连接池时创建连接失败: {e}")
def _create_connection(self) -> Optional[Connection]:
"""创建新的数据库连接"""
try:
conn = pymysql.connect(
host=self.config.host,
port=self.config.port,
user=self.config.user,
password=self.config.password,
charset=self.config.charset,
connect_timeout=self.config.connect_timeout,
read_timeout=self.config.read_timeout,
write_timeout=self.config.write_timeout,
autocommit=True
)
return conn
except Exception as e:
logger.error(f"创建数据库连接失败: {e}")
return None
@contextmanager
def get_connection(self):
"""获取连接的上下文管理器"""
conn = None
try:
# 尝试从池中获取连接
try:
conn = self.pool.get_nowait()
except Empty:
# 池中没有连接,尝试创建新连接
with self.lock:
if self.active_connections < self.max_connections:
conn = self._create_connection()
if conn:
self.active_connections += 1
else:
raise Exception("无法创建新的数据库连接")
else:
# 等待可用连接
logger.info("等待可用连接...")
conn = self.pool.get(timeout=30)
# 检查连接是否仍然有效
if conn and not self._is_connection_alive(conn):
logger.warning("连接已失效,重新创建")
try:
conn.close()
except:
pass
conn = self._create_connection()
if not conn:
raise Exception("重新创建连接失败")
yield conn
except Exception as e:
logger.error(f"获取数据库连接时出错: {e}")
if conn:
try:
conn.close()
except:
pass
with self.lock:
self.active_connections -= 1
raise
else:
# 归还连接到池中
if conn:
try:
self.pool.put_nowait(conn)
except Full:
# 池已满,关闭连接
try:
conn.close()
except:
pass
with self.lock:
self.active_connections -= 1
def _is_connection_alive(self, conn: Connection) -> bool:
"""检查连接是否仍然有效"""
try:
conn.ping(reconnect=False)
return True
except:
return False
def close_all(self) -> None:
"""关闭所有连接"""
logger.info("正在关闭连接池中的所有连接...")
while not self.pool.empty():
try:
conn = self.pool.get_nowait()
conn.close()
except (Empty, Exception):
break
self.active_connections = 0
logger.info("连接池已关闭")
class DataProcessor:
"""数据处理器"""
@@ -357,12 +199,76 @@ class DataProcessor:
class MariaDBClient:
"""优化后的MariaDB数据库客户端"""
"""简化版 MariaDB 客户端(内置轻量连接池以复用连接)"""
def __init__(self, config: DatabaseConfig, max_connections: int = 10):
self.config = config
self.connection_pool = ConnectionPool(config, max_connections)
def __init__(self, max_connections: int = 10):
self.data_processor = DataProcessor()
self._max_connections = max_connections
self._pool: Queue = Queue(maxsize=max_connections)
self._active = 0
self._lock = threading.Lock()
# 预创建少量连接,降低首次延迟
initial = min(3, max_connections)
for _ in range(initial):
conn = self._create_connection()
if conn:
try:
self._pool.put_nowait(conn)
self._active += 1
except Full:
try:
conn.close()
except Exception:
pass
def _create_connection(self):
try:
return pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
charset=DB_CHARSET,
connect_timeout=DB_CONNECT_TIMEOUT,
read_timeout=DB_READ_TIMEOUT,
write_timeout=DB_WRITE_TIMEOUT,
autocommit=True
)
except Exception as e:
logger.error(f"创建数据库连接失败: {e}")
return None
def _acquire_connection(self):
# 先尝试不阻塞获取
try:
return self._pool.get_nowait()
except Empty:
# 池为空,若可创建新连接则创建,否则阻塞等待
with self._lock:
if self._active < self._max_connections:
conn = self._create_connection()
if conn:
self._active += 1
return conn
# 达到上限,阻塞等待可用连接
try:
return self._pool.get(timeout=30)
except Empty:
raise RuntimeError("获取数据库连接超时")
def _release_connection(self, conn):
if not conn:
return
try:
self._pool.put_nowait(conn)
except Full:
# 池已满,关闭多余连接
try:
conn.close()
except Exception:
pass
with self._lock:
self._active = max(0, self._active - 1)
def __enter__(self) -> 'MariaDBClient':
return self
@@ -371,30 +277,52 @@ class MariaDBClient:
self.close()
def close(self) -> None:
"""关闭客户端"""
self.connection_pool.close_all()
"""关闭连接池中的连接"""
try:
while True:
try:
conn = self._pool.get_nowait()
except Empty:
break
try:
conn.close()
except Exception:
pass
finally:
with self._lock:
self._active = 0
def execute_query(self, sql: str, params: Optional[Tuple] = None) -> Tuple[Optional[pd.DataFrame], List[str]]:
"""执行SQL查询"""
"""执行SQL查询(复用连接池连接)"""
conn = None
try:
with self.connection_pool.get_connection() as conn:
conn = self._acquire_connection()
with conn.cursor() as cursor:
cursor.execute(sql, params)
results = cursor.fetchall()
# 获取列名
column_names = [desc[0] for desc in cursor.description] if cursor.description else []
if results:
df = pd.DataFrame(results, columns=column_names)
return df, column_names
else:
return pd.DataFrame(), column_names
except Exception as e:
logger.error(f"执行查询时出错: {e}")
logger.error(f"SQL: {sql}")
return None, []
finally:
if conn:
# 若连接异常,尝试关闭并减少活跃计数;否则归还
try:
conn.ping(reconnect=False)
self._release_connection(conn)
except Exception:
try:
conn.close()
except Exception:
pass
with self._lock:
self._active = max(0, self._active - 1)
def query_sessions(self, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""查询指定日期范围内的会话数据"""
@@ -483,101 +411,18 @@ class MariaDBClient:
logger.error(f"导出到Excel时出错: {e}")
return None
class SessionProcessor:
"""会话处理器,负责并发处理"""
def __init__(self, db_client: MariaDBClient, max_workers: int = None):
self.db_client = db_client
self.max_workers = max_workers if max_workers is not None else os.cpu_count()
self.temp_save_lock = threading.Lock() # 添加锁用于保护临时保存操作
logger.info(f"初始化会话处理器: max_workers={self.max_workers}")
def process_sessions(self, sessions_df: pd.DataFrame) -> List[List[Dict[str, Any]]]:
"""处理所有会话数据"""
if sessions_df.empty:
logger.warning("没有会话数据需要处理")
return []
total_sessions = len(sessions_df)
logger.info(f"开始处理 {total_sessions} 个会话...")
all_conversations = []
# 直接并发处理每个会话
def process_single_session(session_row):
try:
session_id = session_row['SESSION_ID']
messages_df = self.db_client.query_messages_by_session_id(session_id)
if messages_df is not None and not messages_df.empty:
conversation = self.db_client.data_processor.messages_df_to_list(messages_df, session_row)
if conversation:
return conversation
except Exception as e:
logger.error(f"处理会话 {session_row.get('SESSION_ID', 'unknown')} 时出错: {e}")
return None
# 使用线程池处理所有会话
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有会话处理任务
future_to_session = {
executor.submit(process_single_session, row): i
for i, row in sessions_df.iterrows()
}
# 收集结果
with tqdm(total=total_sessions, desc="处理会话进度") as pbar:
for future in concurrent.futures.as_completed(future_to_session):
try:
conversation = future.result()
if conversation:
all_conversations.append(conversation)
# 每处理100个对话临时保存一次
if len(all_conversations) % 100 == 0:
with self.temp_save_lock:
logger.info(f"临时保存 {len(all_conversations)} 个对话")
temp_output_file = self.db_client.export_to_excel(
all_conversations,
f"客服对话记录_临时保存",
output_dir="/data/QueryRewrite/data/excel"
)
if temp_output_file:
logger.info(f"临时保存完成: {temp_output_file}")
except Exception as e:
session_idx = future_to_session[future]
logger.error(f"处理会话索引 {session_idx} 时出错: {e}")
pbar.update(1)
logger.info(f"处理完成,共获得 {len(all_conversations)} 个有效对话")
return all_conversations
def main() -> None:
"""主函数"""
"""主函数(精简版)"""
try:
# 加载配置
config = DatabaseConfig.from_config_file()
logger.info(f"使用数据库配置: {config.host}:{config.port}")
logger.info(f"使用数据库配置: {DB_HOST}:{DB_PORT}")
# 创建数据库客户端
with MariaDBClient(config, max_connections=12) as db_client:
# 创建数据库客户端(简化)
with MariaDBClient() as db_client:
# 查询会话数据
start_date = '2025-08-01 00:00:00'
end_date = '2025-08-01 23:00:00'
logger.info(f"查询时间范围: {start_date}{end_date}")
# 创建会话处理器
processor = SessionProcessor(db_client)
# is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
# if is_debug:
# messages_df = db_client.query_messages_by_session_id("86c919e0-09f1-11f0-84ae-2daf59566989")
# print(db_client.data_processor.messages_df_to_list(messages_df))
# return []
sessions_df = db_client.query_sessions(start_date, end_date)
@@ -585,8 +430,46 @@ def main() -> None:
logger.warning("没有找到符合条件的会话数据")
return
# 处理会话数据
all_conversations = processor.process_sessions(sessions_df)
# 直接并发处理每个会话(替代 SessionProcessor
total_sessions = len(sessions_df)
all_conversations: List[List[Dict[str, Any]]] = []
temp_save_lock = threading.Lock()
def process_single_session(session_row):
try:
session_id = session_row['SESSION_ID']
messages_df = db_client.query_messages_by_session_id(session_id)
if messages_df is not None and not messages_df.empty:
conversation = db_client.data_processor.messages_df_to_list(messages_df, session_row)
if conversation:
return conversation
except Exception as e:
logger.error(f"处理会话 {session_row.get('SESSION_ID', 'unknown')} 时出错: {e}")
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
future_to_session = {executor.submit(process_single_session, row): i for i, row in sessions_df.iterrows()}
with tqdm(total=total_sessions, desc="处理会话进度") as pbar:
for future in concurrent.futures.as_completed(future_to_session):
try:
conversation = future.result()
if conversation:
all_conversations.append(conversation)
if len(all_conversations) % 100 == 0:
with temp_save_lock:
logger.info(f"临时保存 {len(all_conversations)} 个对话")
temp_output_file = db_client.export_to_excel(
all_conversations,
f"客服对话记录_临时保存",
output_dir="/data/QueryRewrite/data/excel"
)
if temp_output_file:
logger.info(f"临时保存完成: {temp_output_file}")
except Exception as e:
session_idx = future_to_session[future]
logger.error(f"处理会话索引 {session_idx} 时出错: {e}")
pbar.update(1)
# 导出结果
if all_conversations:
output_file = db_client.export_to_excel(
@@ -594,7 +477,6 @@ def main() -> None:
"客服对话记录",
output_dir="/data/QueryRewrite/data/excel"
)
if output_file:
logger.info(f"处理完成!共导出 {len(all_conversations)} 个对话到文件: {output_file}")
else:
@@ -20,7 +20,6 @@ import argparse
from typing import List, Dict, Any
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
sys.path.append(os.getcwd())
from rag2_0.intent_recognition import AsyncIntentRecognizer
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
from rag2_0.intent_recognition.DataModels import Classification
-1
View File
@@ -10,7 +10,6 @@ import os
import json
from dotenv import load_dotenv
import sys
sys.path.append(os.getcwd())
from rag2_0.intent_recognition import ProfessionalNounVectorizer
import logging
-1
View File
@@ -15,7 +15,6 @@ from datetime import datetime
import os
from langchain_core.output_parsers import JsonOutputParser
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import ChatClient
from rag2_0.tool.ModelTool import OpenAiLLM
from rag2_0.dify.dify_tool import DifyTool
-1
View File
@@ -6,7 +6,6 @@ import logging
import time
import asyncio
import httpx
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client.client import DifyClient, KnowledgeBaseClient
from rag2_0.tool.ModelTool import XinferenceReRankerModel
@@ -5,7 +5,6 @@ import sys
from dotenv import load_dotenv
load_dotenv()
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import DifyApi
-1
View File
@@ -17,7 +17,6 @@ logging.basicConfig(
]
)
sys.path.append(os.getcwd())
import rag2_0.dify.dify_client.dify_api as DifyApi
import pandas as pd
+45 -1
View File
@@ -6,7 +6,6 @@ import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.dify_client import ChatClient
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
@@ -271,6 +270,51 @@ class DifyTool:
raise Exception(f"Error while getting conversation_messages: {error}")
return None
def execute_custom_sql(self, sql, params=None, fetch_type='all'):
"""
执行自定义的SQL查询或命令。
Args:
sql: 要执行的SQL语句
params: SQL参数(可选),用于参数化查询
fetch_type: 结果获取类型,可选值:'all'(返回所有行), 'one'(返回单行), 'none'(不返回结果)
Returns:
根据fetch_type返回查询结果:
- fetch_type='all': 返回包含所有行的列表,每行是一个字典
- fetch_type='one': 返回单行字典,如果没有结果则返回None
- fetch_type='none': 返回受影响的行数
Raises:
Exception: 如果执行SQL时发生错误
"""
with self.pg_sql_lock:
try:
with self.connection.cursor() as cursor:
# 执行SQL语句
cursor.execute(sql, params or ())
# 根据fetch_type处理结果
if fetch_type == 'all':
result = cursor.fetchall()
if result:
colnames = [desc[0] for desc in cursor.description]
return [dict(zip(colnames, row)) for row in result]
return []
elif fetch_type == 'one':
result = cursor.fetchone()
if result:
colnames = [desc[0] for desc in cursor.description]
return dict(zip(colnames, result))
return None
elif fetch_type == 'none':
# 对于UPDATE, INSERT, DELETE等操作,返回受影响的行数
return cursor.rowcount
else:
raise ValueError(f"不支持的fetch_type: {fetch_type}")
except (Exception, psycopg2.Error) as error:
raise Exception(f"Error executing custom SQL: {error}")
"""
提供用于获取 Dify 应用调试信息的工具类。
-1
View File
@@ -6,7 +6,6 @@ import pandas as pd
import sys
sys.path.append(os.getcwd())
from rag2_0.dify.dify_tool import DifyTool
import requests
+14 -31
View File
@@ -109,31 +109,20 @@ class Classification(BaseModel):
@classmethod
def get_format_instructions(cls):
return """
格式如下,必须严格以纯JSON格式输出
{
"vertical_classification": "垂直领域一级分类",
"sub_classification": "一级分类下的二级分类"
}
字段说明:
vertical_classification 类型:str 描述:垂直领域一级分类
sub_classification 类型:str 描述:一级分类下的二级分类
"""
return """格式如下,必须严格以纯JSON格式输出
{"vertical_classification": "垂直领域一级分类","sub_classification": "一级分类下的二级分类"}
字段说明:
vertical_classification 类型:str 描述:垂直领域一级分类
sub_classification 类型:str 描述:一级分类下的二级分类"""
class QueryRewrite(BaseModel):
rewrite:str = Field(description="问题改写")
@classmethod
def get_format_instructions(cls):
return """
格式如下:必须严格以纯JSON格式输出
{
"rewrite": "问题改写"
}
字段说明:
"rewrite" 类型:str 描述:问题改写之后的内容
"""
return """格式如下:必须严格以纯JSON格式输出{"rewrite": "问题改写"}
字段说明:
rewrite 类型:str 描述:问题改写之后的内容"""
# 意图优化环节数据模型
@@ -145,18 +134,12 @@ class StepBackPrompt(BaseModel):
@classmethod
def get_format_instructions(cls):
return """
格式如下,必须严格以纯JSON格式输出
{
"original_query": "原始查询",
"can_use_back_prompt": "原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示",
"step_back_query": "后退提示生成的抽象查询(多个)"
}
字段说明:
"original_query" 类型:str 描述:用户输入的原始查询
"can_use_back_prompt" 类型:bool 描述:原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示
"step_back_query" 类型:list[str] 描述:后退提示生成的抽象查询(多个)
"""
return """格式如下,必须严格以纯JSON格式输出
{"original_query": "原始查询","can_use_back_prompt": "原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示","step_back_query": "后退提示生成的抽象查询(多个)"}
字段说明:
original_query 类型:str 描述:用户输入的原始查询
can_use_back_prompt 类型:bool 描述:原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示
step_back_query 类型:list[str] 描述:后退提示生成的抽象查询(多个)"""
class FollowUpQuestions(BaseModel):
+20 -20
View File
@@ -17,21 +17,18 @@ from typing import List, Tuple, Dict, Any, Optional
import re
import jieba
import time
import threading
from .PromptTemplates import (classification_prompt, query_rewrite_prompt_pro,
extract_nouns_prompt, classification_info,
slot_filling_prompt, step_back_prompt,
hyde_prompt)
slot_filling_prompt, step_back_prompt)
from .DataModels import (
Classification, QueryRewrite, Term, TermList,
SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots,
DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots,
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult,
StepBackPrompt, HypotheticalDocument
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots,
StepBackPrompt
)
from .ProfessionalNounVector import ProfessionalNounRetriever, AsyncProfessionalNounRetriever
from rag2_0.tool.ModelTool import OpenAiLLM
class AsyncIntentRecognizer:
@@ -344,23 +341,25 @@ class AsyncIntentRecognizer:
"""
start_time = time.time() # 记录开始时间
prompt=f"""
当前提问内容:
<query>{query}</query>
对话上下文:
<chat_history>
{json.dumps(chat_history, ensure_ascii=False)}
</chat_history>
prompt=f"""当前提问内容:
<query>{query}</query>
对话上下文:
<chat_history>
{json.dumps(chat_history, ensure_ascii=False)}
</chat_history>
1、请从当前提问内容中提取电力造价行中定额编码、定额名称、清单编码、清单名称
2、请勿随机编造,如果没有提取到内容返回空的JSON
3、返回结果为json格式,必须严格以纯JSON格式输出
{{
1、请从当前提问内容中提取电力造价行中定额编码、定额名称、清单编码、清单名称
2、请勿随机编造,如果没有提取到内容返回空的JSON
3、返回结果为json格式,必须严格以纯JSON格式输出
{{
"dinge_info_list":{{"dinge_code_list":["xxxx","xxxx"], "dinge_name_list":["xxxx","xxxx"]}},
"qingdan_info":{{"qingdan_code_list":["xxxx","xxxx"], "qingdan_name_list":["xxxx","xxxx"]}}
}}
"""
}}"""
# 暂时跳过提取定额清单信息,本环节还未梳理清楚套用规则。
return {
"dinge_info_list":{"dinge_code_list":[], "dinge_name_list":[]},
"qingdan_info":{"qingdan_code_list":[], "qingdan_name_list":[]}
}
try:
# response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"})
@@ -489,6 +488,7 @@ class AsyncIntentRecognizer:
# 特殊处理 锁相关咨询
if classification.vertical_classification == "安装下载注册" and classification.sub_classification == "软件锁类":
process_lock_start_time = time.time()
# 特殊处理提问只有锁号的问题,手动将问题改写为特定格式
rewrite.rewrite = self._process_lock_related_query(rewrite.rewrite)
process_lock_end_time = time.time()
process_lock_time = process_lock_end_time - process_lock_start_time
+48 -159
View File
@@ -28,14 +28,14 @@ extract_nouns_prompt="""
"""
classification_info="""【垂直领域分类】:
1. 软件问题 -- 涉及软件使用功能询问、软件故障排查等方面的提问或请求
2. 业务问题 -- 涉及电力造价领域专业知识、造价费用计算等电力造价业务知识
3. 安装下载注册 -- 指涉及软件(或插件)安装下载、注册、激活等操作类问题
4. 固定话术类 -- 指涉及需要固定话术回的问题,如:规费咨询、调差下载更新。
5. 其他 -- 与软件或电力造价专业无关的日常对话问候、感慨、情绪表达等
1. 软件问题 -- 涉及软件使用/功能/操作或故障排查
2. 业务问题 -- 涉及电力造价专业知识、计价规则或造价数据计算等。
3. 安装下载注册 -- 及软件或插件安装下载、注册、激活、文件扩展名、软件锁等。
4. 固定话术类 -- 需用固定话术回的问题,如:规费咨询、调差下载更新。
5. 其他 -- 与软件或电力造价无关的一般对话问候、情绪等)
【软件问题包括以下两类】:
1. 软件功能:询问软件功能的使用、功能操作(调整)、功能位置、如何设置、如何转换、如何导入到软件、如何安装到软件等侧重软件主体
1. 软件功能:询问软件如何使用、设置、导入、在软件中安装/转换等(以软件主体)。
示例:ywlk怎么安装到软件中? ywlk是文件后缀名 ---> 将文件导入到软件中
2. 故障排查:软件运行异常、软件报错、软件显示错误等
@@ -56,188 +56,90 @@ classification_info="""【垂直领域分类】:
4. 问题排查类:软件安装下载失败、报错,系统兼容性问题等
【固定话术类包括以下类】:
1. 规费咨询
**以下两种情况才属于该类**
1、当询问规费(如社会保障费和住房公积金)费率是/填多少
2、去哪里获取规费费率
**其余涉及规费的属于其他垂直领域分类**
2. 调差下载更新
**以下两种情况才属于该类**
1、询问如何下载导入调差文件、调差插件
2、询问如何更新导入调差文件、调差插件
调差价格水平差异调整
调差 = 价格水平差异调整
**其余涉及调差的属于其他垂直领域分类**
【其他】:
1. 其他
分类优先级:
固定话术类 > 软件问题 、 业务问题 、 安装下载注册 > 其他
"""
分类优先级:固定话术类 > 软件问题 、 业务问题 、 安装下载注册 > 其他"""
classification_prompt="""
用户正在使用电力造价软件或想询问电力造价领域相关知识,你需要根据用户的输入内容集合历史对话(如果存在),将其归类为以下垂直领域之一:
{classification_info}
classification_prompt="""用户在使用电力造价软件或咨询电力造价相关问题,请将用户输入(结合历史对话,如有)归为以下垂直领域之一:
{classification_info}
## 【历史对话记录】
{chat_history}
## 【历史对话记录】
{chat_history}
【用户输入】:
{user_input}
【用户输入】:
{user_input}
【输出格式要求】:
{output_format}
【示例】
用户输入1: 技改T1怎样新建工程
输出1:
{{
"vertical_classification":"软件问题",
"sub_classification":"软件功能"
}}
"""
query_rewrite_prompt = """
# 电力造价专业问答优化工程师
你是一名电力造价专业问答优化工程师,负责通过专业关键词集合替换原始问题中的非专业表述以提升知识库检索准确率。
## 核心任务
将用户的原始问题结合专业术语库进行规范化重构,提高知识库检索的准确性和专业性。
## 处理流程
### 第一阶段:输入解析
1. 解析基础信息
- 原始问题(需保留核心语义){query}
- 关键词集合:{keywords}
### 第二阶段:匹配分析
**匹配规则:**
1. 检查原始问题中是否包含关键词集合中的`name`字段或`synonymous`字段中的任何词汇
2. 统计匹配的术语数量
3. 判断执行路径:
- 匹配术语 ≥ 1个 → 执行重构流程
- 匹配术语 = 0个 → 直接输出原始问题
### 第三阶段:问题重构
**重构原则(按优先级排序):**
1. **语义保真**:严格保持原问题的核心意图和诉求
2. **术语规范**
- 将匹配到的同义词替换为对应的标准术语(name字段)
- 对在关键词中的标准术语使用【】进行标记
- 保留在原问题中未在关键词库中的专业术语、限定词和修饰词
3. **结构优化**
- 保持原问题的语态特征5W2H
- 保持主谓宾结构清晰
- 保留时间、版本等限定条件
**术语处理规则:**
- 优先级1:保留原问题中的专业术语、限定词和修饰词(即使不在关键词库中)
- 优先级2:将同义词替换为标准术语并用【】标记
- 优先级3:对原问题中已存在的标准术语添加【】标记
# 输出规范
【输出格式要求】:
{output_format}
# 示范案例库
▶ 案例1(有效匹配)
入:
原始问题:怎么把旧版西藏定额工程转到Z1新版
关键词:【'老版本定额升级', '批量设置定额', '西藏造价软件Z1'
输出:
{{"rewrite":"【西藏造价软件Z1】如何执行【老版本定额升级】操作?"}}
【示例】
用户输入1: 技改T1怎样新建工程
出1:
{{
"vertical_classification":"软件问题",
"sub_classification":"软件功能"
}}"""
▶ 案例2(无效匹配)
输入:
原始问题:程序界面文字显示过小如何处理?
关键词:【'定额升级', '工程批量导入'
输出:
{{"rewrite":"程序界面文字显示过小如何处理?"}}
▶ 案例3(部分匹配,但保留修饰限定词)
输入:
原始问题:"配网软件D3能导出清单的计算公式吗?
关键词:【'配网工程计价通D3软件', '计算式'
输出(保留限定修饰词"清单")
{{"rewrite":"【配网工程计价通D3软件】能导出清单的【计算式】吗?"}}
## 质量检查清单
执行前请确认:
- [ ] 是否保持了原问题的核心诉求?
- [ ] 是否正确执行了同义词替换?
- [ ] 是否保留了原问题中的专业术语和限定条件?
- [ ] 是否正确使用了【】标记?
- [ ] 重构后的问题是否自然流畅?
"""
query_rewrite_prompt_pro="""
# 问答优化工程师
query_rewrite_prompt_pro="""# 问答优化工程师
**角色**:基于历史对话和术语库重构问题,提升知识库检索准确率。
**最高准则**
1、保持问题核心意图,允许指代消除
2. 所有新增内容必须源于历史对话或聊天背景,禁止捏造。
3. 归一化替换需严格全词匹配:查询中的词必须与术语库同义词完全一致(不区分大小写)。部分匹配(如子字符串)或不匹配,保留原词
## 核心原则
1. **指代消除 → 当指示代词(""/"")出现时,强制继承历史对话的最新核心主题(如功能或任务),并应用到当前主体。**
1. **指代消除 → 当指示代词(""/"")出现时,继承历史对话的最新核心主题(如功能或任务),并应用到当前主体。**
2. 术语规范 → 提问中出现的同义词(synonymous)替换为标准词(name)并【】标记
3. 语义保真 → 保持问题核心意图,允许指代消除
## 归一化替换规则
1. 只有当问题中的词与术语库中某一项的同义词列表中的某个词完全相同时,才替换为对应的标准词
## 处理流程
### 一、输入解析
- 原始问题(需保留核心语义):
<query> {query} </query>
- 原始问题(需保留核心语义):
<query> {query} </query>
- 术语库集合(用于同义词转标准词环节):
<keywords>
{keywords}
</keywords>
- 术语库集合(用于同义词转标准词环节):
<keywords>
{keywords}
</keywords>
- 历史对话记录:
<history>
{chat_history}
</history>
- 历史对话记录:
<history>
{chat_history}
</history>
### 、重构流程
### 、重构流程
1、问题是否指代不明,指代不明时根据历史对话补充上下文
2、问题是否包含同义词,包含同义词时进行同义词转标准词
### 三、重构优先级
1. **指代消除 → 当指示代词出现时,结合历史对话补充上下文**
2. 同义词转标准词 → 将提问中出现的同义词(synonymous)替换为对应标准词(name) 并使用【】标记
3. 结构优化 → 保持原问题的5W2H特征,指代消除、背景继承下允许微调意图
3. 结构优化 → 指代消除、背景继承下允许微调提问
## 输出规范
{output_format}
## 示例模仿
示例1
输入:
<history>
'user': '811623110668是哪款软件的锁?
'assistant': 可通过查询软件锁的许可证信息,通过许可证名称可以判断对应软件
</history>
<query> ”锂离子电池储能安装“ </query>
输出:
{{"rewrite": "许可证名称为‘锂离子电池储能安装’对应什么软件?"}}
## 质量自检
- [] **主题是否合理继承?**
- [] 核心诉求是否保留?
- [] 语句是否自然流畅?
- [] 避免补充无关信息
"""
- [] 避免补充无关信息"""
slot_filling_prompt = """
你是一个专业的电力造价领域问题槽位填充助手。你需要从用户问题中提取关键信息,并填充到对应的数据结构中。
@@ -282,21 +184,18 @@ slot_filling_prompt = """
"""
# 意图优化环节提示词模板
step_back_prompt = """
# 后退提示生成器
你是一个专业的电力造价领域问题抽象专家。你的任务是根据用户的具体问题,提出一个更抽象、更高层次的问题,帮助系统更好地理解用户的意图。
step_back_prompt = """# 后退提示生成器
你是电力造价领域的问题抽象专家。任务是把用户的具体问题抽象成更高层次的问题,帮助理解其核心意图。
## 任务说明
1. 分析用户原始问题,理解其核心意图和需求
2. 考虑历史对话和会话背景,理解用户当前问题的上下文
3. 生成更抽象、更高层次的问题,称为"后退问题",后退问题可以生成多个,依次后退到更抽象、更高层次的问题
1. 理解用户原始问题的核心
2. 考虑上下文
3. 生成逐级抽象的“后退问题”
4. 后退问题应该:
- 更加通用和抽象,不应包含原始问题的具体细节(包括场景限定、界面限定等其他限定词语)
- 涵盖原始问题的核心主题
- 去除过于具体的限制条件(如时间、地点、特定版本、特定工程等)
- 保持在同一领域和主题范围内
- 依次移除问题中的限定词或者修饰词
- 更加通用和抽象,不应包含原始问题的具体细节(包括场景限定、界面限定等其他限定词语)
- 涵盖原始问题的核心主题
- 去除过于具体的限制条件(如时间、地点、特定版本、特定工程等)
- 保持在同一领域和主题范围内
- 依次移除问题中的限定词或者修饰词
## 输入
用户原始问题: {query}
@@ -306,23 +205,13 @@ step_back_prompt = """
{output_format}
## 示例
原始问题: "2023版本如何在Windows 11系统上导入单位工程量清单?"
后退问题:
{{
"original_query": "2023版本如何在Windows 11系统上导入单位工程量清单?",
"can_use_back_prompt": true,
"step_back_query": ["如何在Windows 11系统上导入单位工程量清单?", "如何导入单位工程量清单?"]
}}
原始问题: "某个设备更换后,如何在系统中更新对应的定额?"
后退问题:
{{
"original_query": "某个设备更换后,如何在系统中更新对应的定额?",
"can_use_back_prompt": true,
"step_back_query": ["如何更新设备对应的定额?", "如何更新定额?"]
}}
"""
}}"""
follow_up_questions_prompt = """
# 后续问题生成器
+15 -33
View File
@@ -193,7 +193,9 @@ class OpenAiLLM:
messages=[{'role': 'user', 'content': user_prompt}],
**kwargs
)
return completion.choices[0].message
message = completion.choices[0].message
message.usage = completion.usage
return message
except Exception as e:
raise RuntimeError(f"OpenAiLLM:invoke:error:{str(e)}") from e
@@ -225,36 +227,16 @@ class OpenAiLLM:
if __name__ == "__main__":
# 测试重排模型
reranker = SiliconFlowReRankerModel()
# 测试用例1:简单问题
query = "如何通过【电力经济评价软件】的【打开】功能加载工程文件?"
documents = []
results = reranker.rerank(query, documents)
print(f"测试用例1 - 查询:{query}")
for idx, item in enumerate(results):
print(f"{idx+1}. 文档: {item['document']}, 分数: {item['score']}")
print("-" * 50)
# 异步测试示例
async def test_async():
# 测试异步嵌入
api_key = APIKeyManager.get_api_key()
embeddings = XinferenceEmbeddings(api_key=api_key)
query_embedding = await embeddings.embed_query_async("测试查询")
print(f"异步嵌入向量维度: {len(query_embedding)}")
# 测试异步重排序
results = await SiliconFlowReRankerModel.rerank_async(query, documents)
print(f"异步重排序结果数量: {len(results)}")
# 测试异步LLM调用
llm = OpenAiLLM()
response = await llm.ainvoke("你好,请简单介绍一下自己")
print(f"异步LLM响应: {response.content}")
# 如果需要运行异步测试,取消下面的注释
# import asyncio
# asyncio.run(test_async())
base_url = os.getenv("OPENAI_API_BASE")
model_name = os.getenv("MODEL_NAME", "gpt-3.5-turbo")
# 初始化LLM
llm_params = {
"temperature": 0.4, # 降低随机性,使结果更确定
"top_p": 0.7,
"model": model_name,
"base_url": base_url
}
_llm = OpenAiLLM(**llm_params)
promt="""你好,请简单介绍一下自己"""
print(_llm.invoke(promt))
+111
View File
@@ -0,0 +1,111 @@
#!/usr/bin/env bash
# 专用脚本:启动 rag2_0.api.intent_recognition_api 服务
# 功能:启动前检测screen是否存在,存在则结束,最后启动服务
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SESSION_NAME="intent_recognition_api"
SERVICE_PORT="8001"
START_COMMAND="cd \"$SCRIPT_DIR\" && uv run uvicorn rag2_0.api.intent_recognition_api:app --host 0.0.0.0 --port 8001 --workers 4"
echo "[脚本] 启动 intent_recognition_api 服务..."
# 检查screen会话是否存在
exists_session() {
# 使用严格匹配,避免误判
if screen -ls 2>/dev/null | grep -q "\\.${SESSION_NAME}\\s"; then
return 0
fi
return 1
}
# 按端口获取监听该端口的任意一个PID,优先用 ss,其次 lsof
pids_on_port() {
# 从 ss 提取 pid 列表
local ss_pids
ss_pids=$(ss -lptn 2>/dev/null \
| grep -E ":${SERVICE_PORT}\\b" \
| awk '{print $NF}' \
| sed -n 's/.*pid=\([0-9]\+\),.*/\1/p' \
| sort -u)
if [[ -n "$ss_pids" ]]; then
echo "$ss_pids"
return 0
fi
# 从 lsof 提取 pid 列表
if command -v lsof >/dev/null 2>&1; then
local lsof_pids
lsof_pids=$(lsof -nP -i :"$SERVICE_PORT" -sTCP:LISTEN -t 2>/dev/null | sort -u)
if [[ -n "$lsof_pids" ]]; then
echo "$lsof_pids"
return 0
fi
fi
return 1
}
# 根据端口优雅终止(TERM)并在必要时强制(KILL)清理进程
kill_by_port() {
local pids
pids=$(pids_on_port || true)
if [[ -z "$pids" ]]; then
return 0
fi
echo "[清理] 端口 $SERVICE_PORT 仍被占用,发送 SIGTERM 到: $pids"
kill -TERM $pids 2>/dev/null || true
sleep 2
# 再次检查
local left
left=$(pids_on_port || true)
if [[ -n "$left" ]]; then
echo "[强制] 端口 $SERVICE_PORT 仍占用,发送 SIGKILL 到: $left"
kill -KILL $left 2>/dev/null || true
fi
}
# 停止已存在的服务
stop_existing_service() {
# 1) 先尝试关闭 screen 会话
if exists_session "$SESSION_NAME"; then
echo "[停止] 发现已存在的 screen 会话 '$SESSION_NAME',正在结束..."
screen -S "$SESSION_NAME" -X quit || true
echo "[停止] screen 会话 '$SESSION_NAME' 已结束"
else
echo "[提示] 未发现 screen 会话: $SESSION_NAME"
fi
# 2) 等待释放端口
sleep 2
# 3) 如果仍占用,按端口清理
if ss -lptn 2>/dev/null | grep -E -q ":${SERVICE_PORT}\\b" || (command -v lsof >/dev/null 2>&1 && lsof -i :"$SERVICE_PORT" -sTCP:LISTEN >/dev/null 2>&1); then
echo "[清理] 端口 $SERVICE_PORT 仍被占用,正在清理..."
kill_by_port
echo "[清理] 端口 $SERVICE_PORT 清理完成"
fi
}
# 启动服务
start_new_service() {
echo "[启动] 准备启动 intent_recognition_api 服务..."
echo "[启动] 启动命令: $START_COMMAND"
screen -dmS "$SESSION_NAME" bash -c "$START_COMMAND"
echo "[启动] intent_recognition_api 服务已启动,screen 会话名: '$SESSION_NAME'"
echo "[启动] 服务运行在端口: $SERVICE_PORT"
echo "[提示] 可使用 'screen -r $SESSION_NAME' 查看服务输出"
}
# 主流程
main() {
# 1. 停止已存在的服务
stop_existing_service
# 2. 启动新服务
start_new_service
echo "[完成] intent_recognition_api 服务启动脚本执行完成"
}
main