818 lines
27 KiB
Python
818 lines
27 KiB
Python
def classify_files(folder_path):
|
||
import os
|
||
import shutil
|
||
from pathlib import Path
|
||
"""
|
||
根据文件名规则将文件复制分类到4个不同的文件夹中
|
||
|
||
参数:
|
||
folder_path: 输入文件夹路径
|
||
|
||
输出:
|
||
在输入文件夹的同级目录中创建4个子文件夹并分类复制文件:
|
||
- xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*)
|
||
- xml_data1.5: 项目划分_取费_*
|
||
- xml_data2: 安装_工程量*, 工程量精度, 工程属性
|
||
- xml_data3: 剩余文件
|
||
"""
|
||
|
||
# 将输入路径转换为Path对象
|
||
source_dir = Path(folder_path)
|
||
|
||
# 检查输入文件夹是否存在
|
||
if not source_dir.exists():
|
||
print(f"错误:文件夹 '{folder_path}' 不存在")
|
||
return False
|
||
|
||
# 获取输入文件夹的父目录
|
||
parent_dir = source_dir.parent
|
||
|
||
# 定义目标文件夹(放在同级目录)
|
||
folders = {
|
||
'xml_data': parent_dir / 'xml_data',
|
||
'xml_data1.5': parent_dir / 'xml_data1.5',
|
||
'xml_data2': parent_dir / 'xml_data2',
|
||
'xml_data3': parent_dir / 'xml_data3'
|
||
}
|
||
|
||
# 在分类前清空或创建目标文件夹
|
||
print("开始清空输出文件夹...")
|
||
for folder_name, folder_path in folders.items():
|
||
if folder_path.exists():
|
||
# 清空文件夹内容
|
||
print(f"清空文件夹: {folder_path}")
|
||
for item in folder_path.iterdir():
|
||
if item.is_file():
|
||
item.unlink() # 删除文件
|
||
elif item.is_dir():
|
||
shutil.rmtree(item) # 删除子文件夹
|
||
else:
|
||
# 创建文件夹
|
||
folder_path.mkdir(parents=True, exist_ok=True)
|
||
print(f"创建文件夹: {folder_path}")
|
||
|
||
print("输出文件夹清空完成,开始复制分类文件...")
|
||
|
||
# 遍历源文件夹中的所有文件
|
||
copied_files = 0
|
||
for file_path in source_dir.iterdir():
|
||
# 只处理文件,跳过文件夹
|
||
if not file_path.is_file():
|
||
continue
|
||
|
||
filename = file_path.name
|
||
|
||
# 分类逻辑
|
||
if filename.startswith('项目划分_取费_'):
|
||
# xml_data1.5: 项目划分_取费_*
|
||
dest_folder = folders['xml_data1.5']
|
||
elif (filename.startswith('费用表') or
|
||
filename.startswith('费用集') or
|
||
filename.startswith('取费表') or
|
||
filename.startswith('线路特征') or
|
||
(filename.startswith('项目划分') and not filename.startswith('项目划分_取费_'))):
|
||
# xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*)
|
||
dest_folder = folders['xml_data']
|
||
elif (filename.startswith('安装_工程量') or
|
||
'工程量精度' in filename or
|
||
'工程属性' in filename):
|
||
# xml_data2: 安装_工程量*, 工程量精度, 工程属性
|
||
dest_folder = folders['xml_data2']
|
||
else:
|
||
# xml_data3: 剩余文件
|
||
dest_folder = folders['xml_data3']
|
||
|
||
# 复制文件而不是移动
|
||
try:
|
||
shutil.copy2(str(file_path), str(dest_folder / filename))
|
||
# print(f"复制文件: {filename} -> {dest_folder.name}")
|
||
copied_files += 1
|
||
except Exception as e:
|
||
print(f"复制文件 {filename} 时出错: {e}")
|
||
|
||
print(f"\n文件复制分类完成!共处理 {copied_files} 个文件")
|
||
print("输出文件夹位置:")
|
||
for name, path in folders.items():
|
||
file_count = len([f for f in path.iterdir() if f.is_file()])
|
||
print(f" {name}: {path} ({file_count} 个文件)")
|
||
|
||
return True
|
||
|
||
def list_target_xmls(
|
||
dir_path: str,
|
||
keyword: str,
|
||
recursive: bool = False,
|
||
fullpath: bool = False):
|
||
|
||
import re
|
||
from pathlib import Path
|
||
from typing import List
|
||
|
||
"""
|
||
在 dir_path 下查找匹配 {keyword}*.xml 的文件。
|
||
- keyword: 动态关键字,例如 "安装_工程量"
|
||
- recursive: 是否递归子目录
|
||
- fullpath: 返回完整路径(True) 或仅文件名(False)
|
||
"""
|
||
# 构造正则,注意转义特殊字符
|
||
safe_kw = re.escape(keyword)
|
||
pattern = re.compile(rf'^{safe_kw}.*\.xml$', re.IGNORECASE)
|
||
|
||
base = Path(dir_path)
|
||
if not base.is_dir():
|
||
raise NotADirectoryError(f"目录不存在或不可用: {dir_path}")
|
||
|
||
it = base.rglob('*') if recursive else base.iterdir()
|
||
|
||
results = []
|
||
for p in it:
|
||
if p.is_file() and pattern.match(p.name):
|
||
results.append(str(p.resolve() if fullpath else p.name))
|
||
|
||
results.sort(key=lambda s: s.lower())
|
||
return results
|
||
|
||
def filter_files_by_display_names(xml_file_path, files_list, prefix="费用表_"):
|
||
|
||
import re
|
||
import os
|
||
import xml.etree.ElementTree as ET
|
||
|
||
"""
|
||
从“费用表分类信息.xml” => 筛选出需要的“费用表_*.xml”
|
||
|
||
Args:
|
||
xml_file_path: 包含显示名称的XML文件路径
|
||
files_list: 需要筛选的文件路径列表
|
||
|
||
Returns:
|
||
筛选后的文件路径列表
|
||
"""
|
||
# 从XML文件提取显示名称
|
||
try:
|
||
tree = ET.parse(xml_file_path)
|
||
root = tree.getroot()
|
||
|
||
display_names = []
|
||
# 查找所有包含"显示名称"属性的元素
|
||
for elem in root.iter():
|
||
display_name = elem.get('显示名称')
|
||
if display_name:
|
||
display_names.append(display_name)
|
||
except Exception as e:
|
||
print(f"解析XML文件时出错: {e}")
|
||
return []
|
||
|
||
# 构造动态正则表达式
|
||
safe_prefix = re.escape(prefix)
|
||
pattern = re.compile(rf'{safe_prefix}(.*?)\.xml$', re.IGNORECASE)
|
||
|
||
matched_files = []
|
||
matched_names = set() # 使用集合避免重复
|
||
|
||
for file_path in files_list:
|
||
match = pattern.search(os.path.basename(file_path))
|
||
if match and match.group(1) in display_names:
|
||
matched_files.append(file_path)
|
||
matched_names.add(match.group(1))
|
||
|
||
# 将匹配的显示名称转换为列表,保持原始顺序
|
||
matched_display_names = [name for name in display_names if name in matched_names]
|
||
|
||
return matched_files, matched_display_names
|
||
|
||
def extract_table_name_from_xml_file(file_path):
|
||
import xml.etree.ElementTree as ET
|
||
"""
|
||
从XML文件中提取<table>标签的名称属性
|
||
|
||
Args:
|
||
file_path (str): XML文件路径
|
||
|
||
Returns:
|
||
str: table的名称属性值,如果未找到则返回None
|
||
"""
|
||
try:
|
||
# 从文件解析XML
|
||
tree = ET.parse(file_path)
|
||
root = tree.getroot()
|
||
|
||
# 查找DataDefs -> dataset -> table 元素
|
||
table_element = root.find('.//DataDefs/dataset/table')
|
||
|
||
if table_element is not None:
|
||
# 获取名称属性
|
||
table_name = table_element.get('名称')
|
||
return table_name
|
||
else:
|
||
print("未找到table元素")
|
||
return None
|
||
|
||
except ET.ParseError as e:
|
||
print(f"XML解析错误: {e}")
|
||
return None
|
||
except FileNotFoundError:
|
||
print(f"文件未找到: {file_path}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"处理XML时发生错误: {e}")
|
||
return None
|
||
|
||
###########################费用表#################################
|
||
# 根据分类信息,没有:变量费用、常驻费用、其他
|
||
# projectData => projectCost
|
||
def xml_to_json(xml_content):
|
||
import xml.etree.ElementTree as ET
|
||
"""
|
||
将XML内容转换为指定的JSON格式
|
||
"""
|
||
try:
|
||
# 解析XML
|
||
root = ET.fromstring(xml_content)
|
||
|
||
# 查找所有的table元素
|
||
tables = root.findall('.//table')
|
||
|
||
result = []
|
||
|
||
for table in tables:
|
||
# 查找table下的trs元素
|
||
trs = table.find('./trs')
|
||
if trs is not None:
|
||
# 处理trs下的所有直接tr子元素
|
||
for tr in trs.findall('./tr'):
|
||
tr_data = process_tr_element(tr)
|
||
if tr_data:
|
||
result.append(tr_data)
|
||
|
||
return result
|
||
|
||
except ET.ParseError as e:
|
||
print(f"XML解析错误: {e}")
|
||
return []
|
||
except Exception as e:
|
||
print(f"处理错误: {e}")
|
||
return []
|
||
|
||
def process_tr_element(tr_element):
|
||
"""
|
||
递归处理tr元素及其子元素
|
||
"""
|
||
if tr_element is None:
|
||
return None
|
||
|
||
# 提取当前tr元素的所有属性
|
||
node_data = {}
|
||
for attr_name, attr_value in tr_element.attrib.items():
|
||
node_data[attr_name] = attr_value
|
||
|
||
# 查找当前tr元素下的直接tr子元素
|
||
children = []
|
||
for child_tr in tr_element.findall('./tr'):
|
||
child_data = process_tr_element(child_tr)
|
||
if child_data:
|
||
children.append(child_data)
|
||
|
||
# 如果有子元素,添加到children字段
|
||
if children:
|
||
node_data['children'] = children
|
||
|
||
return node_data
|
||
|
||
def xml_file_to_json(file_path):
|
||
"""
|
||
从XML文件读取并转换为JSON
|
||
"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
xml_content = file.read()
|
||
return xml_to_json(xml_content)
|
||
except Exception as e:
|
||
print(f"文件读取错误: {e}")
|
||
return []
|
||
|
||
# def save_json_to_file(json_data, output_file):
|
||
# """
|
||
# 将JSON数据保存到文件
|
||
# """
|
||
# try:
|
||
# with open(output_file, 'w', encoding='utf-8') as file:
|
||
# json.dump(json_data, file, ensure_ascii=False, indent=4)
|
||
# print(f"JSON数据已保存到: {output_file}")
|
||
# except Exception as e:
|
||
# print(f"保存文件错误: {e}")
|
||
|
||
# 保存结果
|
||
# save_json_to_file(result, "output.json")
|
||
|
||
|
||
###########################取费表#################################
|
||
# projectData => costSetting
|
||
def name_TypeList_info(xml_path):
|
||
import xml.etree.ElementTree as ET
|
||
try:
|
||
tree = ET.parse(xml_path) # 替换为你的XML文件路径
|
||
root = tree.getroot()
|
||
|
||
# 查找目标元素
|
||
target_tr = root.find(".//tr[@名称='名称']")
|
||
|
||
if target_tr is not None:
|
||
value = target_tr.get('值')
|
||
return value
|
||
else:
|
||
return "未找到指定的tr元素"
|
||
|
||
except FileNotFoundError:
|
||
return "XML文件未找到"
|
||
except ET.ParseError:
|
||
return "XML解析错误"
|
||
|
||
###########################项目划分#################################
|
||
# projectData => projectDivision
|
||
def pd_get_table_names_from_xml(file_path):
|
||
import xml.etree.ElementTree as ET
|
||
"""
|
||
从XML文件中提取所有table元素的名称属性
|
||
|
||
参数:
|
||
file_path (str): XML文件路径
|
||
|
||
返回:
|
||
list: 包含所有table名称的列表
|
||
"""
|
||
try:
|
||
tree = ET.parse(file_path)
|
||
tables = tree.findall('.//table')
|
||
return [table.get('名称') for table in tables if table.get('名称')]
|
||
except Exception as e:
|
||
print(f"错误: {e}")
|
||
return []
|
||
|
||
def build_project_division(names):
|
||
result = {}
|
||
for name in names:
|
||
key = f"项目划分_{name}"
|
||
result[key] = {name: None}
|
||
return result
|
||
|
||
def create_mapping(table_names, files_list):
|
||
"""
|
||
创建表名到文件路径的映射
|
||
"""
|
||
result = {}
|
||
|
||
for table_name in table_names:
|
||
# 构建期望的文件名模式
|
||
expected_filename = f"项目划分_{table_name}.xml"
|
||
|
||
# 在文件列表中查找匹配的文件路径
|
||
for file_path in files_list:
|
||
if file_path.endswith(expected_filename):
|
||
result[table_name] = file_path
|
||
break
|
||
|
||
return result
|
||
|
||
###########################费用预览(项目划分_取费)#################################
|
||
# projectData => expensePreview
|
||
|
||
def parse_costs_from_xml_file(xml_path):
|
||
import xml.etree.ElementTree as ET
|
||
# 解析 XML 文件
|
||
tree = ET.parse(xml_path)
|
||
root = tree.getroot()
|
||
|
||
# 获取 <table> 元素
|
||
table_elem = root.find('.//table')
|
||
if table_elem is None:
|
||
raise ValueError("未找到 <table> 元素")
|
||
|
||
guid = table_elem.get('name', '')
|
||
|
||
# 提取 <props> 中 id -> name 映射
|
||
id_to_name = {}
|
||
for tr in table_elem.find('props').findall('tr'):
|
||
tr_id = tr.get('id')
|
||
tr_name = tr.get('name')
|
||
if tr_id is not None and tr_name is not None:
|
||
id_to_name[tr_id] = tr_name
|
||
|
||
# 获取 <trs>/<tr>,并提取 cost 属性
|
||
tr_elem = table_elem.find('trs/tr')
|
||
if tr_elem is None:
|
||
raise ValueError("未找到 <trs>/<tr> 元素")
|
||
|
||
cost_str = tr_elem.get('cost')
|
||
if cost_str is None:
|
||
raise ValueError("在 <tr> 元素中未找到 'cost' 属性")
|
||
|
||
# 构建 id -> cost 映射
|
||
cost_map = {}
|
||
for item in cost_str.split(','):
|
||
if ':' not in item:
|
||
continue
|
||
key, value = item.split(':', 1)
|
||
cost_map[key] = value if value else None
|
||
|
||
# 构建最终输出结构
|
||
result = {
|
||
"GUID": f"{{{guid}}}",
|
||
"children": []
|
||
}
|
||
|
||
for tr_id, tr_name in id_to_name.items():
|
||
result["children"].append({
|
||
"id": tr_name,
|
||
"cost": cost_map.get(tr_id)
|
||
})
|
||
|
||
return result
|
||
|
||
###########################工程量#################################
|
||
# projectData => projectDivision
|
||
|
||
# 更安全的版本,不依赖eval验证
|
||
def extract_file_paths_with_access_path_safe(data, target_key="_文件名称", root_var="data"):
|
||
"""
|
||
安全版本:不依赖eval验证路径
|
||
"""
|
||
results = []
|
||
|
||
def _recursive_extract(obj, current_path_parts, is_list_index=False):
|
||
if isinstance(obj, dict):
|
||
if target_key in obj:
|
||
# 构建完整的访问路径
|
||
full_path_parts = current_path_parts + [f"['{target_key}']"]
|
||
full_access_path = root_var + ''.join(full_path_parts)
|
||
results.append({
|
||
"file_path": obj[target_key],
|
||
"access_path": full_access_path,
|
||
"path_parts": full_path_parts # 保存路径部分用于验证
|
||
})
|
||
|
||
for key, value in obj.items():
|
||
if key == target_key:
|
||
continue
|
||
|
||
new_path_parts = current_path_parts + [f"['{key}']"]
|
||
_recursive_extract(value, new_path_parts, False)
|
||
|
||
elif isinstance(obj, list):
|
||
for i, item in enumerate(obj):
|
||
new_path_parts = current_path_parts + [f"[{i}]"]
|
||
_recursive_extract(item, new_path_parts, True)
|
||
|
||
_recursive_extract(data, [], False)
|
||
return results
|
||
|
||
def parse_xml_to_json(xml_content: str, from_file: bool = False):
|
||
import xml.etree.ElementTree as ET
|
||
from typing import Dict, List, Any
|
||
"""
|
||
将XML内容或XML文件转换为期望的JSON结构
|
||
|
||
Args:
|
||
xml_content: XML格式的字符串内容或XML文件路径
|
||
from_file: 如果为True,则xml_content被视为文件路径;否则视为XML字符串
|
||
|
||
Returns:
|
||
转换后的JSON结构列表
|
||
"""
|
||
# 解析XML
|
||
if from_file:
|
||
tree = ET.parse(xml_content)
|
||
root = tree.getroot()
|
||
else:
|
||
root = ET.fromstring(xml_content)
|
||
|
||
# 构建人材机数据字典,用于快速查找
|
||
material_dict = {}
|
||
material_table = root.find(".//table[@名称='人材机']")
|
||
if material_table is not None:
|
||
for tr in material_table.findall(".//tr"):
|
||
material_id = tr.get("id")
|
||
if material_id:
|
||
# 复制所有属性,并将_s改为s
|
||
material_data = {}
|
||
for attr, value in tr.attrib.items():
|
||
if attr == "_s":
|
||
material_data["s"] = value
|
||
else:
|
||
material_data[attr] = value
|
||
material_dict[material_id] = material_data
|
||
|
||
# 处理安装表数据
|
||
result = []
|
||
install_table = root.find(".//table[@名称='安装']")
|
||
|
||
if install_table is not None:
|
||
for tr in install_table.findall(".//tr"):
|
||
# 处理主tr的属性
|
||
main_item = {}
|
||
for attr, value in tr.attrib.items():
|
||
main_item[attr] = value
|
||
|
||
# 处理children
|
||
children = []
|
||
|
||
# 遍历所有的props
|
||
for props in tr.findall(".//props"):
|
||
props_type = props.get("type")
|
||
if not props_type:
|
||
continue
|
||
|
||
props_item = {
|
||
"type": props_type,
|
||
"children": []
|
||
}
|
||
|
||
# 处理props下的tr
|
||
for child_tr in props.findall(".//tr"):
|
||
if props_type == "40000016":
|
||
# 对于type=40000016,使用s属性查找人材机数据
|
||
s_value = child_tr.get("s")
|
||
if s_value and s_value in material_dict:
|
||
props_item["children"].append(material_dict[s_value])
|
||
else:
|
||
# 对于其他type,直接使用当前tr的属性
|
||
child_data = {}
|
||
for attr, value in child_tr.attrib.items():
|
||
child_data[attr] = value
|
||
props_item["children"].append(child_data)
|
||
|
||
children.append(props_item)
|
||
|
||
main_item["children"] = children
|
||
result.append(main_item)
|
||
|
||
return result
|
||
|
||
def file_exists_simple(file_path):
|
||
import os
|
||
"""判断文件是否存在(简化版)"""
|
||
return 1 if os.path.isfile(file_path) else 0
|
||
|
||
|
||
|
||
|
||
def get_by_strpath(data, path: str, replace_with=None):
|
||
import re
|
||
import ast
|
||
import copy
|
||
|
||
"""
|
||
从 data 中按路径字符串(如 "data['a'][0]['b']")取值。
|
||
安全:不执行任意代码;仅解析 [] 访问。
|
||
|
||
可选:
|
||
- replace_with: 若不为 None,则在成功读取后把该位置的值替换为 replace_with 的深拷贝。
|
||
约束:替换前内容必须是 str,replace_with 必须是 list。
|
||
返回值:写入到数据结构中的“新 list”(深拷贝对象)。
|
||
|
||
无替换时:返回读取到的原值。
|
||
"""
|
||
if not isinstance(path, str):
|
||
raise TypeError("path 必须是 str")
|
||
|
||
s = path.strip()
|
||
if s.startswith("data"):
|
||
s = s[4:] # 去掉开头的 data
|
||
|
||
cur = data
|
||
pos = 0
|
||
parent = None
|
||
last_selector = None # ("idx", i) 或 ("key", k)
|
||
|
||
# 匹配 ["key"] / ['key'] / [123],允许空格
|
||
_TOKEN_RE = re.compile(r"""
|
||
\[\s*
|
||
(?:
|
||
(?P<q>'(?:\\.|[^\\'])*'|"(?:\\.|[^\\"])*") # 带引号的字符串键
|
||
| (?P<idx>-?\d+) # 数字索引(支持负数)
|
||
)
|
||
\s*\]
|
||
""", re.VERBOSE)
|
||
|
||
for m in _TOKEN_RE.finditer(s):
|
||
# 确保 token 之间只有空白,没有其他字符
|
||
if s[pos:m.start()].strip():
|
||
junk = s[pos:m.start()].strip()
|
||
raise ValueError(f"无法解析路径,存在非法片段: {junk!r}")
|
||
pos = m.end()
|
||
|
||
parent = cur
|
||
if m.group("idx") is not None: # list 索引
|
||
idx = int(m.group("idx"))
|
||
try:
|
||
cur = cur[idx]
|
||
except Exception as e:
|
||
raise IndexError(f"索引 {idx} 访问失败;当前类型为 {type(parent).__name__}") from e
|
||
last_selector = ("idx", idx)
|
||
else: # dict 键
|
||
qstr = m.group("q")
|
||
key = ast.literal_eval(qstr) # 安全还原字符串
|
||
try:
|
||
cur = cur[key]
|
||
except Exception as e:
|
||
raise KeyError(f"键 {key!r} 访问失败;当前类型为 {type(parent).__name__}") from e
|
||
last_selector = ("key", key)
|
||
|
||
# 末尾不应有多余内容
|
||
if s[pos:].strip():
|
||
raise ValueError("路径末尾存在无法解析的内容")
|
||
|
||
# 替换逻辑:写入新 list(深拷贝),并返回这个新 list
|
||
if replace_with is not None:
|
||
if last_selector is None:
|
||
raise ValueError("路径为空,无法替换根节点;请提供至少一个 [] 访问片段")
|
||
if not isinstance(cur, str):
|
||
raise TypeError(f"替换前的值必须是 str,当前为 {type(cur).__name__}")
|
||
if not isinstance(replace_with, list):
|
||
raise TypeError("replace_with 必须是 list")
|
||
|
||
new_list = copy.deepcopy(replace_with) # 保证返回/写入的是“新 list”
|
||
kind, sel = last_selector
|
||
if kind == "idx":
|
||
if not isinstance(parent, list):
|
||
raise TypeError(f"父节点应为 list,但为 {type(parent).__name__}")
|
||
parent[sel] = new_list
|
||
else:
|
||
if not isinstance(parent, dict):
|
||
raise TypeError(f"父节点应为 dict,但为 {type(parent).__name__}")
|
||
parent[sel] = new_list
|
||
return new_list
|
||
|
||
# 不替换则返回读取值
|
||
return cur
|
||
|
||
###########################线路特征段#################################
|
||
# projectData => 线路特征段
|
||
|
||
def extract_title_from_path(path: str) -> str:
|
||
import os
|
||
import re
|
||
"""
|
||
从文件路径中提取目标段(下划线后、{数字} 和 .xml 之前的部分)。
|
||
例如: '.../线路特征_土质比例{1272}.xml' -> '土质比例'
|
||
"""
|
||
name = os.path.basename(path)
|
||
stem, ext = os.path.splitext(name)
|
||
m = re.compile(r"^[^_]*_(.+?)(?:\{\d+\})?$").match(stem)
|
||
if not m:
|
||
raise ValueError(f"无法从文件名提取:{name}")
|
||
return m.group(1).strip()
|
||
|
||
# 修复转换后xml的格式问题
|
||
def process_xml_file(input_file_path, output_file_path=None):
|
||
import re
|
||
import xml.etree.ElementTree as ET
|
||
import os
|
||
"""
|
||
处理XML文件中的不规范内容,对XML解析错误有更好的容错处理
|
||
|
||
参数:
|
||
input_file_path: 输入XML文件路径
|
||
output_file_path: 输出XML文件路径,如果为None则自动生成
|
||
|
||
返回:
|
||
output_file_path: 输出文件路径
|
||
"""
|
||
|
||
def process_text_content(text):
|
||
"""处理文本内容"""
|
||
if not text:
|
||
return text
|
||
|
||
# 1. 将所有":"转为"_"
|
||
processed = text.replace(":", "_")
|
||
|
||
# 2. 处理类似(≥/≤/>/<******)格式
|
||
pattern1 = r'((≥|≤|>|<)(.*?))'
|
||
|
||
def replace_special_symbols(match):
|
||
symbol = match.group(1)
|
||
content = match.group(2)
|
||
|
||
symbol_mapping = {
|
||
'≥': '大于等于',
|
||
'≤': '小于等于',
|
||
'>': '大于',
|
||
'<': '小于'
|
||
}
|
||
|
||
return f'_{symbol_mapping.get(symbol, symbol)}{content}'
|
||
|
||
processed = re.sub(pattern1, replace_special_symbols, processed)
|
||
|
||
# 3. 处理类似A(*)转为A_*
|
||
pattern2 = r'([^()\s]+)(([^)\s]+))'
|
||
processed = re.sub(pattern2, r'\1_\2', processed)
|
||
|
||
return processed
|
||
|
||
def process_xml_element(element):
|
||
"""递归处理XML元素"""
|
||
# 处理元素的文本内容
|
||
if element.text:
|
||
element.text = process_text_content(element.text)
|
||
|
||
# 处理元素的属性
|
||
for attr_name, attr_value in element.attrib.items():
|
||
element.set(attr_name, process_text_content(attr_value))
|
||
|
||
# 处理子元素
|
||
for child in element:
|
||
process_xml_element(child)
|
||
|
||
# 处理子元素的尾部文本
|
||
if child.tail:
|
||
child.tail = process_text_content(child.tail)
|
||
|
||
try:
|
||
print("----------清洗文件----------")
|
||
# 设置输出文件路径
|
||
if output_file_path is None:
|
||
file_dir = os.path.dirname(input_file_path)
|
||
file_name = os.path.basename(input_file_path)
|
||
name_without_ext = os.path.splitext(file_name)[0]
|
||
output_file_path = os.path.join(file_dir, f"{name_without_ext}_processed.xml")
|
||
|
||
# 尝试直接解析XML文件
|
||
try:
|
||
tree = ET.parse(input_file_path)
|
||
root = tree.getroot()
|
||
|
||
# 处理整个XML树
|
||
process_xml_element(root)
|
||
|
||
# 保存处理后的XML
|
||
tree.write(output_file_path, encoding='utf-8', xml_declaration=True)
|
||
|
||
except ET.ParseError as e:
|
||
print(f"XML解析错误: {e}")
|
||
print("尝试使用替代方法处理文件...")
|
||
|
||
# 替代方法:逐行读取并处理文件内容
|
||
with open(input_file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# 处理整个文件内容
|
||
processed_content = process_text_content(content)
|
||
|
||
# 保存处理后的内容
|
||
with open(output_file_path, 'w', encoding='utf-8') as f:
|
||
f.write(processed_content)
|
||
|
||
print("使用文本模式处理完成,但可能无法完全保持XML结构")
|
||
|
||
print(f"XML文件处理完成!")
|
||
print(f"输入文件: {input_file_path}")
|
||
print(f"输出文件: {output_file_path}")
|
||
|
||
print("----------清洗结束----------")
|
||
|
||
return output_file_path
|
||
|
||
except Exception as e:
|
||
raise Exception(f"处理文件时发生错误: {e}")
|
||
|
||
###########################工程属性#################################
|
||
# projectData => projectInfo
|
||
|
||
def xml_to_dict(xml_path: str):
|
||
import xml.etree.ElementTree as ET
|
||
"""
|
||
将指定路径的XML文件解析为字典
|
||
|
||
Args:
|
||
xml_path: XML文件的路径
|
||
|
||
Returns:
|
||
包含XML中数据的字典,格式为 {名称: 值}
|
||
"""
|
||
try:
|
||
# 解析XML文件
|
||
tree = ET.parse(xml_path)
|
||
root = tree.getroot()
|
||
|
||
result_dict = {}
|
||
|
||
# 查找所有的tr元素
|
||
for tr in root.findall('.//tr'):
|
||
# 获取名称属性
|
||
name = tr.get('名称')
|
||
# 获取值属性,如果没有值属性则返回None
|
||
value = tr.get('值')
|
||
|
||
if name is not None:
|
||
result_dict[name] = value
|
||
|
||
return result_dict
|
||
|
||
except ET.ParseError as e:
|
||
raise ValueError(f"XML解析错误: {e}")
|
||
except FileNotFoundError:
|
||
raise FileNotFoundError(f"文件未找到: {xml_path}")
|
||
except Exception as e:
|
||
raise RuntimeError(f"处理XML文件时发生错误: {e}") |