Files
KG_generation/extract_errors.py
chentianrui 0a4dedda1c 更新代码
2025-10-14 16:13:18 +08:00

129 lines
5.2 KiB
Python

import re
import codecs
def extract_errors_and_warnings(input_log_path, output_error_path, warning_stats_path="warning_statistics.txt"):
"""
从日志文件中提取 WARNING 和 ERROR 及其 Traceback 堆栈信息,保存到新文件
支持多种编码格式的日志文件(UTF-8, GBK, ASCII等)
同时统计WARNING信息并输出到单独文件
"""
# 正则匹配日志行开头(时间戳格式)
log_pattern = re.compile(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})")
# 尝试多种编码格式读取文件
encodings = ["utf-8", "gbk", "gb2312", "ascii"]
lines = []
for encoding in encodings:
try:
with open(input_log_path, "r", encoding=encoding) as f:
lines = f.readlines()
print(f"✅ 成功使用 {encoding} 编码读取文件")
break
except UnicodeDecodeError:
continue
# 如果所有编码都失败,则使用二进制模式读取并尝试解码
if not lines:
try:
with open(input_log_path, "rb") as f:
content = f.read()
# 尝试解码,忽略错误
lines = content.decode("utf-8", errors="ignore").splitlines(True)
print("⚠️ 使用二进制模式读取文件,可能有字符丢失")
except Exception as e:
print(f"❌ 无法读取文件: {e}")
return
error_lines = []
i = 0
while i < len(lines):
line = lines[i]
# 检查是否是新的日志行(以时间戳开头)
is_new_log = log_pattern.match(line)
if is_new_log:
# 判断是否为 WARNING 或 ERROR
if " - WARNING - " in line or " - ERROR - " in line:
error_lines.append(line.rstrip())
# 如果是 ERROR,捕获后续的 Traceback 信息
if " - ERROR - " in line:
i += 1
# 继续读取后续行,直到遇到下一个时间戳行或文件结束
while i < len(lines):
next_line = lines[i]
# 检查是否是新的日志行
if log_pattern.match(next_line):
# 回退索引,让外层循环处理这一行
i -= 1
break
else:
# 添加非空的后续行
if next_line.strip():
error_lines.append(next_line.rstrip())
i += 1
# 如果是DEBUG/INFO行,检查是否包含Traceback
elif " - DEBUG - " in line and i + 1 < len(lines) and "Traceback" in lines[i + 1]:
# 这是一个包含Traceback的DEBUG信息,也提取
error_lines.append(line.rstrip())
i += 1
# 继续读取后续行,直到遇到下一个时间戳行或文件结束
while i < len(lines):
next_line = lines[i]
# 检查是否是新的日志行
if log_pattern.match(next_line):
# 回退索引,让外层循环处理这一行
i -= 1
break
else:
# 添加非空的后续行
if next_line.strip():
error_lines.append(next_line.rstrip())
i += 1
else:
# 不是新日志行开头,可能是多行信息的一部分
# 只有当前面一行是错误信息时才添加
if error_lines and not log_pattern.match(error_lines[-1]):
if line.strip():
error_lines.append(line.rstrip())
i += 1
# 写入输出文件
with open(output_error_path, "w", encoding="utf-8") as f:
for err_line in error_lines:
f.write(err_line + "\n")
# 统计WARNING信息
warning_dict = {}
for line in error_lines:
if " - WARNING - " in line:
# 提取WARNING后的内容作为键
warning_content = line.split(" - WARNING - ", 1)[1]
if warning_content in warning_dict:
warning_dict[warning_content] += 1
else:
warning_dict[warning_content] = 1
# 写入统计结果到文件
with open(warning_stats_path, "w", encoding="utf-8") as f:
f.write("WARNING统计结果:\n")
f.write(f"共找到 {len(warning_dict)} 种不同的WARNING信息\n\n")
for warning_content, count in warning_dict.items():
f.write(f"{warning_content}\n")
print(f"✅ 提取完成!共找到 {len(error_lines)} 行错误/警告信息。")
print(f"📁 已保存到: {output_error_path}")
print(f"📊 WARNING统计已保存到: {warning_stats_path}")
# ============ 使用示例 ============
if __name__ == "__main__":
input_file = "data/input/bclresults/bcl_calculator.log" # 替换为你的日志文件路径
output_file = "error_report.txt" # 输出的错误报告文件
warning_stats_file = "warning_statistics.txt" # WARNING统计结果文件
extract_errors_and_warnings(input_file, output_file, warning_stats_file)