Files
chentianrui 4459270336 上传文件
2025-10-21 11:34:54 +08:00

818 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
def classify_files(folder_path):
import os
import shutil
from pathlib import Path
"""
根据文件名规则将文件复制分类到4个不同的文件夹中
参数:
folder_path: 输入文件夹路径
输出:
在输入文件夹的同级目录中创建4个子文件夹并分类复制文件:
- xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*)
- xml_data1.5: 项目划分_取费_*
- xml_data2: 安装_工程量*, 工程量精度, 工程属性
- xml_data3: 剩余文件
"""
# 将输入路径转换为Path对象
source_dir = Path(folder_path)
# 检查输入文件夹是否存在
if not source_dir.exists():
print(f"错误:文件夹 '{folder_path}' 不存在")
return False
# 获取输入文件夹的父目录
parent_dir = source_dir.parent
# 定义目标文件夹(放在同级目录)
folders = {
'xml_data': parent_dir / 'xml_data',
'xml_data1.5': parent_dir / 'xml_data1.5',
'xml_data2': parent_dir / 'xml_data2',
'xml_data3': parent_dir / 'xml_data3'
}
# 在分类前清空或创建目标文件夹
print("开始清空输出文件夹...")
for folder_name, folder_path in folders.items():
if folder_path.exists():
# 清空文件夹内容
print(f"清空文件夹: {folder_path}")
for item in folder_path.iterdir():
if item.is_file():
item.unlink() # 删除文件
elif item.is_dir():
shutil.rmtree(item) # 删除子文件夹
else:
# 创建文件夹
folder_path.mkdir(parents=True, exist_ok=True)
print(f"创建文件夹: {folder_path}")
print("输出文件夹清空完成,开始复制分类文件...")
# 遍历源文件夹中的所有文件
copied_files = 0
for file_path in source_dir.iterdir():
# 只处理文件,跳过文件夹
if not file_path.is_file():
continue
filename = file_path.name
# 分类逻辑
if filename.startswith('项目划分_取费_'):
# xml_data1.5: 项目划分_取费_*
dest_folder = folders['xml_data1.5']
elif (filename.startswith('费用表') or
filename.startswith('费用集') or
filename.startswith('取费表') or
filename.startswith('线路特征') or
(filename.startswith('项目划分') and not filename.startswith('项目划分_取费_'))):
# xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*)
dest_folder = folders['xml_data']
elif (filename.startswith('安装_工程量') or
'工程量精度' in filename or
'工程属性' in filename):
# xml_data2: 安装_工程量*, 工程量精度, 工程属性
dest_folder = folders['xml_data2']
else:
# xml_data3: 剩余文件
dest_folder = folders['xml_data3']
# 复制文件而不是移动
try:
shutil.copy2(str(file_path), str(dest_folder / filename))
# print(f"复制文件: {filename} -> {dest_folder.name}")
copied_files += 1
except Exception as e:
print(f"复制文件 {filename} 时出错: {e}")
print(f"\n文件复制分类完成!共处理 {copied_files} 个文件")
print("输出文件夹位置:")
for name, path in folders.items():
file_count = len([f for f in path.iterdir() if f.is_file()])
print(f" {name}: {path} ({file_count} 个文件)")
return True
def list_target_xmls(
dir_path: str,
keyword: str,
recursive: bool = False,
fullpath: bool = False):
import re
from pathlib import Path
from typing import List
"""
在 dir_path 下查找匹配 {keyword}*.xml 的文件。
- keyword: 动态关键字,例如 "安装_工程量"
- recursive: 是否递归子目录
- fullpath: 返回完整路径(True) 或仅文件名(False)
"""
# 构造正则,注意转义特殊字符
safe_kw = re.escape(keyword)
pattern = re.compile(rf'^{safe_kw}.*\.xml$', re.IGNORECASE)
base = Path(dir_path)
if not base.is_dir():
raise NotADirectoryError(f"目录不存在或不可用: {dir_path}")
it = base.rglob('*') if recursive else base.iterdir()
results = []
for p in it:
if p.is_file() and pattern.match(p.name):
results.append(str(p.resolve() if fullpath else p.name))
results.sort(key=lambda s: s.lower())
return results
def filter_files_by_display_names(xml_file_path, files_list, prefix="费用表_"):
import re
import os
import xml.etree.ElementTree as ET
"""
从“费用表分类信息.xml” => 筛选出需要的“费用表_*.xml”
Args:
xml_file_path: 包含显示名称的XML文件路径
files_list: 需要筛选的文件路径列表
Returns:
筛选后的文件路径列表
"""
# 从XML文件提取显示名称
try:
tree = ET.parse(xml_file_path)
root = tree.getroot()
display_names = []
# 查找所有包含"显示名称"属性的元素
for elem in root.iter():
display_name = elem.get('显示名称')
if display_name:
display_names.append(display_name)
except Exception as e:
print(f"解析XML文件时出错: {e}")
return []
# 构造动态正则表达式
safe_prefix = re.escape(prefix)
pattern = re.compile(rf'{safe_prefix}(.*?)\.xml$', re.IGNORECASE)
matched_files = []
matched_names = set() # 使用集合避免重复
for file_path in files_list:
match = pattern.search(os.path.basename(file_path))
if match and match.group(1) in display_names:
matched_files.append(file_path)
matched_names.add(match.group(1))
# 将匹配的显示名称转换为列表,保持原始顺序
matched_display_names = [name for name in display_names if name in matched_names]
return matched_files, matched_display_names
def extract_table_name_from_xml_file(file_path):
import xml.etree.ElementTree as ET
"""
从XML文件中提取<table>标签的名称属性
Args:
file_path (str): XML文件路径
Returns:
str: table的名称属性值,如果未找到则返回None
"""
try:
# 从文件解析XML
tree = ET.parse(file_path)
root = tree.getroot()
# 查找DataDefs -> dataset -> table 元素
table_element = root.find('.//DataDefs/dataset/table')
if table_element is not None:
# 获取名称属性
table_name = table_element.get('名称')
return table_name
else:
print("未找到table元素")
return None
except ET.ParseError as e:
print(f"XML解析错误: {e}")
return None
except FileNotFoundError:
print(f"文件未找到: {file_path}")
return None
except Exception as e:
print(f"处理XML时发生错误: {e}")
return None
###########################费用表#################################
# 根据分类信息,没有:变量费用、常驻费用、其他
# projectData => projectCost
def xml_to_json(xml_content):
import xml.etree.ElementTree as ET
"""
将XML内容转换为指定的JSON格式
"""
try:
# 解析XML
root = ET.fromstring(xml_content)
# 查找所有的table元素
tables = root.findall('.//table')
result = []
for table in tables:
# 查找table下的trs元素
trs = table.find('./trs')
if trs is not None:
# 处理trs下的所有直接tr子元素
for tr in trs.findall('./tr'):
tr_data = process_tr_element(tr)
if tr_data:
result.append(tr_data)
return result
except ET.ParseError as e:
print(f"XML解析错误: {e}")
return []
except Exception as e:
print(f"处理错误: {e}")
return []
def process_tr_element(tr_element):
"""
递归处理tr元素及其子元素
"""
if tr_element is None:
return None
# 提取当前tr元素的所有属性
node_data = {}
for attr_name, attr_value in tr_element.attrib.items():
node_data[attr_name] = attr_value
# 查找当前tr元素下的直接tr子元素
children = []
for child_tr in tr_element.findall('./tr'):
child_data = process_tr_element(child_tr)
if child_data:
children.append(child_data)
# 如果有子元素,添加到children字段
if children:
node_data['children'] = children
return node_data
def xml_file_to_json(file_path):
"""
从XML文件读取并转换为JSON
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
xml_content = file.read()
return xml_to_json(xml_content)
except Exception as e:
print(f"文件读取错误: {e}")
return []
# def save_json_to_file(json_data, output_file):
# """
# 将JSON数据保存到文件
# """
# try:
# with open(output_file, 'w', encoding='utf-8') as file:
# json.dump(json_data, file, ensure_ascii=False, indent=4)
# print(f"JSON数据已保存到: {output_file}")
# except Exception as e:
# print(f"保存文件错误: {e}")
# 保存结果
# save_json_to_file(result, "output.json")
###########################取费表#################################
# projectData => costSetting
def name_TypeList_info(xml_path):
import xml.etree.ElementTree as ET
try:
tree = ET.parse(xml_path) # 替换为你的XML文件路径
root = tree.getroot()
# 查找目标元素
target_tr = root.find(".//tr[@名称='名称']")
if target_tr is not None:
value = target_tr.get('值')
return value
else:
return "未找到指定的tr元素"
except FileNotFoundError:
return "XML文件未找到"
except ET.ParseError:
return "XML解析错误"
###########################项目划分#################################
# projectData => projectDivision
def pd_get_table_names_from_xml(file_path):
import xml.etree.ElementTree as ET
"""
从XML文件中提取所有table元素的名称属性
参数:
file_path (str): XML文件路径
返回:
list: 包含所有table名称的列表
"""
try:
tree = ET.parse(file_path)
tables = tree.findall('.//table')
return [table.get('名称') for table in tables if table.get('名称')]
except Exception as e:
print(f"错误: {e}")
return []
def build_project_division(names):
result = {}
for name in names:
key = f"项目划分_{name}"
result[key] = {name: None}
return result
def create_mapping(table_names, files_list):
"""
创建表名到文件路径的映射
"""
result = {}
for table_name in table_names:
# 构建期望的文件名模式
expected_filename = f"项目划分_{table_name}.xml"
# 在文件列表中查找匹配的文件路径
for file_path in files_list:
if file_path.endswith(expected_filename):
result[table_name] = file_path
break
return result
###########################费用预览(项目划分_取费)#################################
# projectData => expensePreview
def parse_costs_from_xml_file(xml_path):
import xml.etree.ElementTree as ET
# 解析 XML 文件
tree = ET.parse(xml_path)
root = tree.getroot()
# 获取 <table> 元素
table_elem = root.find('.//table')
if table_elem is None:
raise ValueError("未找到 <table> 元素")
guid = table_elem.get('name', '')
# 提取 <props> 中 id -> name 映射
id_to_name = {}
for tr in table_elem.find('props').findall('tr'):
tr_id = tr.get('id')
tr_name = tr.get('name')
if tr_id is not None and tr_name is not None:
id_to_name[tr_id] = tr_name
# 获取 <trs>/<tr>,并提取 cost 属性
tr_elem = table_elem.find('trs/tr')
if tr_elem is None:
raise ValueError("未找到 <trs>/<tr> 元素")
cost_str = tr_elem.get('cost')
if cost_str is None:
raise ValueError("在 <tr> 元素中未找到 'cost' 属性")
# 构建 id -> cost 映射
cost_map = {}
for item in cost_str.split(','):
if ':' not in item:
continue
key, value = item.split(':', 1)
cost_map[key] = value if value else None
# 构建最终输出结构
result = {
"GUID": f"{{{guid}}}",
"children": []
}
for tr_id, tr_name in id_to_name.items():
result["children"].append({
"id": tr_name,
"cost": cost_map.get(tr_id)
})
return result
###########################工程量#################################
# projectData => projectDivision
# 更安全的版本,不依赖eval验证
def extract_file_paths_with_access_path_safe(data, target_key="_文件名称", root_var="data"):
"""
安全版本:不依赖eval验证路径
"""
results = []
def _recursive_extract(obj, current_path_parts, is_list_index=False):
if isinstance(obj, dict):
if target_key in obj:
# 构建完整的访问路径
full_path_parts = current_path_parts + [f"['{target_key}']"]
full_access_path = root_var + ''.join(full_path_parts)
results.append({
"file_path": obj[target_key],
"access_path": full_access_path,
"path_parts": full_path_parts # 保存路径部分用于验证
})
for key, value in obj.items():
if key == target_key:
continue
new_path_parts = current_path_parts + [f"['{key}']"]
_recursive_extract(value, new_path_parts, False)
elif isinstance(obj, list):
for i, item in enumerate(obj):
new_path_parts = current_path_parts + [f"[{i}]"]
_recursive_extract(item, new_path_parts, True)
_recursive_extract(data, [], False)
return results
def parse_xml_to_json(xml_content: str, from_file: bool = False):
import xml.etree.ElementTree as ET
from typing import Dict, List, Any
"""
将XML内容或XML文件转换为期望的JSON结构
Args:
xml_content: XML格式的字符串内容或XML文件路径
from_file: 如果为True,则xml_content被视为文件路径;否则视为XML字符串
Returns:
转换后的JSON结构列表
"""
# 解析XML
if from_file:
tree = ET.parse(xml_content)
root = tree.getroot()
else:
root = ET.fromstring(xml_content)
# 构建人材机数据字典,用于快速查找
material_dict = {}
material_table = root.find(".//table[@名称='人材机']")
if material_table is not None:
for tr in material_table.findall(".//tr"):
material_id = tr.get("id")
if material_id:
# 复制所有属性,并将_s改为s
material_data = {}
for attr, value in tr.attrib.items():
if attr == "_s":
material_data["s"] = value
else:
material_data[attr] = value
material_dict[material_id] = material_data
# 处理安装表数据
result = []
install_table = root.find(".//table[@名称='安装']")
if install_table is not None:
for tr in install_table.findall(".//tr"):
# 处理主tr的属性
main_item = {}
for attr, value in tr.attrib.items():
main_item[attr] = value
# 处理children
children = []
# 遍历所有的props
for props in tr.findall(".//props"):
props_type = props.get("type")
if not props_type:
continue
props_item = {
"type": props_type,
"children": []
}
# 处理props下的tr
for child_tr in props.findall(".//tr"):
if props_type == "40000016":
# 对于type=40000016,使用s属性查找人材机数据
s_value = child_tr.get("s")
if s_value and s_value in material_dict:
props_item["children"].append(material_dict[s_value])
else:
# 对于其他type,直接使用当前tr的属性
child_data = {}
for attr, value in child_tr.attrib.items():
child_data[attr] = value
props_item["children"].append(child_data)
children.append(props_item)
main_item["children"] = children
result.append(main_item)
return result
def file_exists_simple(file_path):
import os
"""判断文件是否存在(简化版)"""
return 1 if os.path.isfile(file_path) else 0
def get_by_strpath(data, path: str, replace_with=None):
import re
import ast
import copy
"""
从 data 中按路径字符串(如 "data['a'][0]['b']")取值。
安全:不执行任意代码;仅解析 [] 访问。
可选:
- replace_with: 若不为 None,则在成功读取后把该位置的值替换为 replace_with 的深拷贝。
约束:替换前内容必须是 strreplace_with 必须是 list。
返回值:写入到数据结构中的“新 list”(深拷贝对象)。
无替换时:返回读取到的原值。
"""
if not isinstance(path, str):
raise TypeError("path 必须是 str")
s = path.strip()
if s.startswith("data"):
s = s[4:] # 去掉开头的 data
cur = data
pos = 0
parent = None
last_selector = None # ("idx", i) 或 ("key", k)
# 匹配 ["key"] / ['key'] / [123],允许空格
_TOKEN_RE = re.compile(r"""
\[\s*
(?:
(?P<q>'(?:\\.|[^\\'])*'|"(?:\\.|[^\\"])*") # 带引号的字符串键
| (?P<idx>-?\d+) # 数字索引(支持负数)
)
\s*\]
""", re.VERBOSE)
for m in _TOKEN_RE.finditer(s):
# 确保 token 之间只有空白,没有其他字符
if s[pos:m.start()].strip():
junk = s[pos:m.start()].strip()
raise ValueError(f"无法解析路径,存在非法片段: {junk!r}")
pos = m.end()
parent = cur
if m.group("idx") is not None: # list 索引
idx = int(m.group("idx"))
try:
cur = cur[idx]
except Exception as e:
raise IndexError(f"索引 {idx} 访问失败;当前类型为 {type(parent).__name__}") from e
last_selector = ("idx", idx)
else: # dict 键
qstr = m.group("q")
key = ast.literal_eval(qstr) # 安全还原字符串
try:
cur = cur[key]
except Exception as e:
raise KeyError(f"键 {key!r} 访问失败;当前类型为 {type(parent).__name__}") from e
last_selector = ("key", key)
# 末尾不应有多余内容
if s[pos:].strip():
raise ValueError("路径末尾存在无法解析的内容")
# 替换逻辑:写入新 list(深拷贝),并返回这个新 list
if replace_with is not None:
if last_selector is None:
raise ValueError("路径为空,无法替换根节点;请提供至少一个 [] 访问片段")
if not isinstance(cur, str):
raise TypeError(f"替换前的值必须是 str,当前为 {type(cur).__name__}")
if not isinstance(replace_with, list):
raise TypeError("replace_with 必须是 list")
new_list = copy.deepcopy(replace_with) # 保证返回/写入的是“新 list”
kind, sel = last_selector
if kind == "idx":
if not isinstance(parent, list):
raise TypeError(f"父节点应为 list,但为 {type(parent).__name__}")
parent[sel] = new_list
else:
if not isinstance(parent, dict):
raise TypeError(f"父节点应为 dict,但为 {type(parent).__name__}")
parent[sel] = new_list
return new_list
# 不替换则返回读取值
return cur
###########################线路特征段#################################
# projectData => 线路特征段
def extract_title_from_path(path: str) -> str:
import os
import re
"""
从文件路径中提取目标段(下划线后、{数字} 和 .xml 之前的部分)。
例如: '.../线路特征_土质比例{1272}.xml' -> '土质比例'
"""
name = os.path.basename(path)
stem, ext = os.path.splitext(name)
m = re.compile(r"^[^_]*_(.+?)(?:\{\d+\})?$").match(stem)
if not m:
raise ValueError(f"无法从文件名提取:{name}")
return m.group(1).strip()
# 修复转换后xml的格式问题
def process_xml_file(input_file_path, output_file_path=None):
import re
import xml.etree.ElementTree as ET
import os
"""
处理XML文件中的不规范内容,对XML解析错误有更好的容错处理
参数:
input_file_path: 输入XML文件路径
output_file_path: 输出XML文件路径,如果为None则自动生成
返回:
output_file_path: 输出文件路径
"""
def process_text_content(text):
"""处理文本内容"""
if not text:
return text
# 1. 将所有""转为"_"
processed = text.replace("", "_")
# 2. 处理类似(≥/≤/>/<******)格式
pattern1 = r'(≥|≤|>|<)(.*?)'
def replace_special_symbols(match):
symbol = match.group(1)
content = match.group(2)
symbol_mapping = {
'≥': '大于等于',
'≤': '小于等于',
'>': '大于',
'<': '小于'
}
return f'_{symbol_mapping.get(symbol, symbol)}{content}'
processed = re.sub(pattern1, replace_special_symbols, processed)
# 3. 处理类似A*)转为A_*
pattern2 = r'([^()\s]+)([^\s]+)'
processed = re.sub(pattern2, r'\1_\2', processed)
return processed
def process_xml_element(element):
"""递归处理XML元素"""
# 处理元素的文本内容
if element.text:
element.text = process_text_content(element.text)
# 处理元素的属性
for attr_name, attr_value in element.attrib.items():
element.set(attr_name, process_text_content(attr_value))
# 处理子元素
for child in element:
process_xml_element(child)
# 处理子元素的尾部文本
if child.tail:
child.tail = process_text_content(child.tail)
try:
print("----------清洗文件----------")
# 设置输出文件路径
if output_file_path is None:
file_dir = os.path.dirname(input_file_path)
file_name = os.path.basename(input_file_path)
name_without_ext = os.path.splitext(file_name)[0]
output_file_path = os.path.join(file_dir, f"{name_without_ext}_processed.xml")
# 尝试直接解析XML文件
try:
tree = ET.parse(input_file_path)
root = tree.getroot()
# 处理整个XML树
process_xml_element(root)
# 保存处理后的XML
tree.write(output_file_path, encoding='utf-8', xml_declaration=True)
except ET.ParseError as e:
print(f"XML解析错误: {e}")
print("尝试使用替代方法处理文件...")
# 替代方法:逐行读取并处理文件内容
with open(input_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 处理整个文件内容
processed_content = process_text_content(content)
# 保存处理后的内容
with open(output_file_path, 'w', encoding='utf-8') as f:
f.write(processed_content)
print("使用文本模式处理完成,但可能无法完全保持XML结构")
print(f"XML文件处理完成!")
print(f"输入文件: {input_file_path}")
print(f"输出文件: {output_file_path}")
print("----------清洗结束----------")
return output_file_path
except Exception as e:
raise Exception(f"处理文件时发生错误: {e}")
###########################工程属性#################################
# projectData => projectInfo
def xml_to_dict(xml_path: str):
import xml.etree.ElementTree as ET
"""
将指定路径的XML文件解析为字典
Args:
xml_path: XML文件的路径
Returns:
包含XML中数据的字典,格式为 {名称: 值}
"""
try:
# 解析XML文件
tree = ET.parse(xml_path)
root = tree.getroot()
result_dict = {}
# 查找所有的tr元素
for tr in root.findall('.//tr'):
# 获取名称属性
name = tr.get('名称')
# 获取值属性,如果没有值属性则返回None
value = tr.get('值')
if name is not None:
result_dict[name] = value
return result_dict
except ET.ParseError as e:
raise ValueError(f"XML解析错误: {e}")
except FileNotFoundError:
raise FileNotFoundError(f"文件未找到: {xml_path}")
except Exception as e:
raise RuntimeError(f"处理XML文件时发生错误: {e}")