def classify_files(folder_path): import os import shutil from pathlib import Path """ 根据文件名规则将文件复制分类到4个不同的文件夹中 参数: folder_path: 输入文件夹路径 输出: 在输入文件夹的同级目录中创建4个子文件夹并分类复制文件: - xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*) - xml_data1.5: 项目划分_取费_* - xml_data2: 安装_工程量*, 工程量精度, 工程属性 - xml_data3: 剩余文件 """ # 将输入路径转换为Path对象 source_dir = Path(folder_path) # 检查输入文件夹是否存在 if not source_dir.exists(): print(f"错误:文件夹 '{folder_path}' 不存在") return False # 获取输入文件夹的父目录 parent_dir = source_dir.parent # 定义目标文件夹(放在同级目录) folders = { 'xml_data': parent_dir / 'xml_data', 'xml_data1.5': parent_dir / 'xml_data1.5', 'xml_data2': parent_dir / 'xml_data2', 'xml_data3': parent_dir / 'xml_data3' } # 在分类前清空或创建目标文件夹 print("开始清空输出文件夹...") for folder_name, folder_path in folders.items(): if folder_path.exists(): # 清空文件夹内容 print(f"清空文件夹: {folder_path}") for item in folder_path.iterdir(): if item.is_file(): item.unlink() # 删除文件 elif item.is_dir(): shutil.rmtree(item) # 删除子文件夹 else: # 创建文件夹 folder_path.mkdir(parents=True, exist_ok=True) print(f"创建文件夹: {folder_path}") print("输出文件夹清空完成,开始复制分类文件...") # 遍历源文件夹中的所有文件 copied_files = 0 for file_path in source_dir.iterdir(): # 只处理文件,跳过文件夹 if not file_path.is_file(): continue filename = file_path.name # 分类逻辑 if filename.startswith('项目划分_取费_'): # xml_data1.5: 项目划分_取费_* dest_folder = folders['xml_data1.5'] elif (filename.startswith('费用表') or filename.startswith('费用集') or filename.startswith('取费表') or filename.startswith('线路特征') or (filename.startswith('项目划分') and not filename.startswith('项目划分_取费_'))): # xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*) dest_folder = folders['xml_data'] elif (filename.startswith('安装_工程量') or '工程量精度' in filename or '工程属性' in filename): # xml_data2: 安装_工程量*, 工程量精度, 工程属性 dest_folder = folders['xml_data2'] else: # xml_data3: 剩余文件 dest_folder = folders['xml_data3'] # 复制文件而不是移动 try: shutil.copy2(str(file_path), str(dest_folder / filename)) # print(f"复制文件: {filename} -> {dest_folder.name}") copied_files += 1 except Exception as e: print(f"复制文件 {filename} 时出错: {e}") print(f"\n文件复制分类完成!共处理 {copied_files} 个文件") print("输出文件夹位置:") for name, path in folders.items(): file_count = len([f for f in path.iterdir() if f.is_file()]) print(f" {name}: {path} ({file_count} 个文件)") return True def list_target_xmls( dir_path: str, keyword: str, recursive: bool = False, fullpath: bool = False): import re from pathlib import Path from typing import List """ 在 dir_path 下查找匹配 {keyword}*.xml 的文件。 - keyword: 动态关键字,例如 "安装_工程量" - recursive: 是否递归子目录 - fullpath: 返回完整路径(True) 或仅文件名(False) """ # 构造正则,注意转义特殊字符 safe_kw = re.escape(keyword) pattern = re.compile(rf'^{safe_kw}.*\.xml$', re.IGNORECASE) base = Path(dir_path) if not base.is_dir(): raise NotADirectoryError(f"目录不存在或不可用: {dir_path}") it = base.rglob('*') if recursive else base.iterdir() results = [] for p in it: if p.is_file() and pattern.match(p.name): results.append(str(p.resolve() if fullpath else p.name)) results.sort(key=lambda s: s.lower()) return results def filter_files_by_display_names(xml_file_path, files_list, prefix="费用表_"): import re import os import xml.etree.ElementTree as ET """ 从“费用表分类信息.xml” => 筛选出需要的“费用表_*.xml” Args: xml_file_path: 包含显示名称的XML文件路径 files_list: 需要筛选的文件路径列表 Returns: 筛选后的文件路径列表 """ # 从XML文件提取显示名称 try: tree = ET.parse(xml_file_path) root = tree.getroot() display_names = [] # 查找所有包含"显示名称"属性的元素 for elem in root.iter(): display_name = elem.get('显示名称') if display_name: display_names.append(display_name) except Exception as e: print(f"解析XML文件时出错: {e}") return [] # 构造动态正则表达式 safe_prefix = re.escape(prefix) pattern = re.compile(rf'{safe_prefix}(.*?)\.xml$', re.IGNORECASE) matched_files = [] matched_names = set() # 使用集合避免重复 for file_path in files_list: match = pattern.search(os.path.basename(file_path)) if match and match.group(1) in display_names: matched_files.append(file_path) matched_names.add(match.group(1)) # 将匹配的显示名称转换为列表,保持原始顺序 matched_display_names = [name for name in display_names if name in matched_names] return matched_files, matched_display_names def extract_table_name_from_xml_file(file_path): import xml.etree.ElementTree as ET """ 从XML文件中提取标签的名称属性 Args: file_path (str): XML文件路径 Returns: str: table的名称属性值,如果未找到则返回None """ try: # 从文件解析XML tree = ET.parse(file_path) root = tree.getroot() # 查找DataDefs -> dataset -> table 元素 table_element = root.find('.//DataDefs/dataset/table') if table_element is not None: # 获取名称属性 table_name = table_element.get('名称') return table_name else: print("未找到table元素") return None except ET.ParseError as e: print(f"XML解析错误: {e}") return None except FileNotFoundError: print(f"文件未找到: {file_path}") return None except Exception as e: print(f"处理XML时发生错误: {e}") return None ###########################费用表################################# # 根据分类信息,没有:变量费用、常驻费用、其他 # projectData => projectCost def xml_to_json(xml_content): import xml.etree.ElementTree as ET """ 将XML内容转换为指定的JSON格式 """ try: # 解析XML root = ET.fromstring(xml_content) # 查找所有的table元素 tables = root.findall('.//table') result = [] for table in tables: # 查找table下的trs元素 trs = table.find('./trs') if trs is not None: # 处理trs下的所有直接tr子元素 for tr in trs.findall('./tr'): tr_data = process_tr_element(tr) if tr_data: result.append(tr_data) return result except ET.ParseError as e: print(f"XML解析错误: {e}") return [] except Exception as e: print(f"处理错误: {e}") return [] def process_tr_element(tr_element): """ 递归处理tr元素及其子元素 """ if tr_element is None: return None # 提取当前tr元素的所有属性 node_data = {} for attr_name, attr_value in tr_element.attrib.items(): node_data[attr_name] = attr_value # 查找当前tr元素下的直接tr子元素 children = [] for child_tr in tr_element.findall('./tr'): child_data = process_tr_element(child_tr) if child_data: children.append(child_data) # 如果有子元素,添加到children字段 if children: node_data['children'] = children return node_data def xml_file_to_json(file_path): """ 从XML文件读取并转换为JSON """ try: with open(file_path, 'r', encoding='utf-8') as file: xml_content = file.read() return xml_to_json(xml_content) except Exception as e: print(f"文件读取错误: {e}") return [] # def save_json_to_file(json_data, output_file): # """ # 将JSON数据保存到文件 # """ # try: # with open(output_file, 'w', encoding='utf-8') as file: # json.dump(json_data, file, ensure_ascii=False, indent=4) # print(f"JSON数据已保存到: {output_file}") # except Exception as e: # print(f"保存文件错误: {e}") # 保存结果 # save_json_to_file(result, "output.json") ###########################取费表################################# # projectData => costSetting def name_TypeList_info(xml_path): import xml.etree.ElementTree as ET try: tree = ET.parse(xml_path) # 替换为你的XML文件路径 root = tree.getroot() # 查找目标元素 target_tr = root.find(".//tr[@名称='名称']") if target_tr is not None: value = target_tr.get('值') return value else: return "未找到指定的tr元素" except FileNotFoundError: return "XML文件未找到" except ET.ParseError: return "XML解析错误" ###########################项目划分################################# # projectData => projectDivision def pd_get_table_names_from_xml(file_path): import xml.etree.ElementTree as ET """ 从XML文件中提取所有table元素的名称属性 参数: file_path (str): XML文件路径 返回: list: 包含所有table名称的列表 """ try: tree = ET.parse(file_path) tables = tree.findall('.//table') return [table.get('名称') for table in tables if table.get('名称')] except Exception as e: print(f"错误: {e}") return [] def build_project_division(names): result = {} for name in names: key = f"项目划分_{name}" result[key] = {name: None} return result def create_mapping(table_names, files_list): """ 创建表名到文件路径的映射 """ result = {} for table_name in table_names: # 构建期望的文件名模式 expected_filename = f"项目划分_{table_name}.xml" # 在文件列表中查找匹配的文件路径 for file_path in files_list: if file_path.endswith(expected_filename): result[table_name] = file_path break return result ###########################费用预览(项目划分_取费)################################# # projectData => expensePreview def parse_costs_from_xml_file(xml_path): import xml.etree.ElementTree as ET # 解析 XML 文件 tree = ET.parse(xml_path) root = tree.getroot() # 获取
元素 table_elem = root.find('.//table') if table_elem is None: raise ValueError("未找到
元素") guid = table_elem.get('name', '') # 提取 中 id -> name 映射 id_to_name = {} for tr in table_elem.find('props').findall('tr'): tr_id = tr.get('id') tr_name = tr.get('name') if tr_id is not None and tr_name is not None: id_to_name[tr_id] = tr_name # 获取 /,并提取 cost 属性 tr_elem = table_elem.find('trs/tr') if tr_elem is None: raise ValueError("未找到 / 元素") cost_str = tr_elem.get('cost') if cost_str is None: raise ValueError("在 元素中未找到 'cost' 属性") # 构建 id -> cost 映射 cost_map = {} for item in cost_str.split(','): if ':' not in item: continue key, value = item.split(':', 1) cost_map[key] = value if value else None # 构建最终输出结构 result = { "GUID": f"{{{guid}}}", "children": [] } for tr_id, tr_name in id_to_name.items(): result["children"].append({ "id": tr_name, "cost": cost_map.get(tr_id) }) return result ###########################工程量################################# # projectData => projectDivision # 更安全的版本,不依赖eval验证 def extract_file_paths_with_access_path_safe(data, target_key="_文件名称", root_var="data"): """ 安全版本:不依赖eval验证路径 """ results = [] def _recursive_extract(obj, current_path_parts, is_list_index=False): if isinstance(obj, dict): if target_key in obj: # 构建完整的访问路径 full_path_parts = current_path_parts + [f"['{target_key}']"] full_access_path = root_var + ''.join(full_path_parts) results.append({ "file_path": obj[target_key], "access_path": full_access_path, "path_parts": full_path_parts # 保存路径部分用于验证 }) for key, value in obj.items(): if key == target_key: continue new_path_parts = current_path_parts + [f"['{key}']"] _recursive_extract(value, new_path_parts, False) elif isinstance(obj, list): for i, item in enumerate(obj): new_path_parts = current_path_parts + [f"[{i}]"] _recursive_extract(item, new_path_parts, True) _recursive_extract(data, [], False) return results def parse_xml_to_json(xml_content: str, from_file: bool = False): import xml.etree.ElementTree as ET from typing import Dict, List, Any """ 将XML内容或XML文件转换为期望的JSON结构 Args: xml_content: XML格式的字符串内容或XML文件路径 from_file: 如果为True,则xml_content被视为文件路径;否则视为XML字符串 Returns: 转换后的JSON结构列表 """ # 解析XML if from_file: tree = ET.parse(xml_content) root = tree.getroot() else: root = ET.fromstring(xml_content) # 构建人材机数据字典,用于快速查找 material_dict = {} material_table = root.find(".//table[@名称='人材机']") if material_table is not None: for tr in material_table.findall(".//tr"): material_id = tr.get("id") if material_id: # 复制所有属性,并将_s改为s material_data = {} for attr, value in tr.attrib.items(): if attr == "_s": material_data["s"] = value else: material_data[attr] = value material_dict[material_id] = material_data # 处理安装表数据 result = [] install_table = root.find(".//table[@名称='安装']") if install_table is not None: for tr in install_table.findall(".//tr"): # 处理主tr的属性 main_item = {} for attr, value in tr.attrib.items(): main_item[attr] = value # 处理children children = [] # 遍历所有的props for props in tr.findall(".//props"): props_type = props.get("type") if not props_type: continue props_item = { "type": props_type, "children": [] } # 处理props下的tr for child_tr in props.findall(".//tr"): if props_type == "40000016": # 对于type=40000016,使用s属性查找人材机数据 s_value = child_tr.get("s") if s_value and s_value in material_dict: props_item["children"].append(material_dict[s_value]) else: # 对于其他type,直接使用当前tr的属性 child_data = {} for attr, value in child_tr.attrib.items(): child_data[attr] = value props_item["children"].append(child_data) children.append(props_item) main_item["children"] = children result.append(main_item) return result def file_exists_simple(file_path): import os """判断文件是否存在(简化版)""" return 1 if os.path.isfile(file_path) else 0 def get_by_strpath(data, path: str, replace_with=None): import re import ast import copy """ 从 data 中按路径字符串(如 "data['a'][0]['b']")取值。 安全:不执行任意代码;仅解析 [] 访问。 可选: - replace_with: 若不为 None,则在成功读取后把该位置的值替换为 replace_with 的深拷贝。 约束:替换前内容必须是 str,replace_with 必须是 list。 返回值:写入到数据结构中的“新 list”(深拷贝对象)。 无替换时:返回读取到的原值。 """ if not isinstance(path, str): raise TypeError("path 必须是 str") s = path.strip() if s.startswith("data"): s = s[4:] # 去掉开头的 data cur = data pos = 0 parent = None last_selector = None # ("idx", i) 或 ("key", k) # 匹配 ["key"] / ['key'] / [123],允许空格 _TOKEN_RE = re.compile(r""" \[\s* (?: (?P'(?:\\.|[^\\'])*'|"(?:\\.|[^\\"])*") # 带引号的字符串键 | (?P-?\d+) # 数字索引(支持负数) ) \s*\] """, re.VERBOSE) for m in _TOKEN_RE.finditer(s): # 确保 token 之间只有空白,没有其他字符 if s[pos:m.start()].strip(): junk = s[pos:m.start()].strip() raise ValueError(f"无法解析路径,存在非法片段: {junk!r}") pos = m.end() parent = cur if m.group("idx") is not None: # list 索引 idx = int(m.group("idx")) try: cur = cur[idx] except Exception as e: raise IndexError(f"索引 {idx} 访问失败;当前类型为 {type(parent).__name__}") from e last_selector = ("idx", idx) else: # dict 键 qstr = m.group("q") key = ast.literal_eval(qstr) # 安全还原字符串 try: cur = cur[key] except Exception as e: raise KeyError(f"键 {key!r} 访问失败;当前类型为 {type(parent).__name__}") from e last_selector = ("key", key) # 末尾不应有多余内容 if s[pos:].strip(): raise ValueError("路径末尾存在无法解析的内容") # 替换逻辑:写入新 list(深拷贝),并返回这个新 list if replace_with is not None: if last_selector is None: raise ValueError("路径为空,无法替换根节点;请提供至少一个 [] 访问片段") if not isinstance(cur, str): raise TypeError(f"替换前的值必须是 str,当前为 {type(cur).__name__}") if not isinstance(replace_with, list): raise TypeError("replace_with 必须是 list") new_list = copy.deepcopy(replace_with) # 保证返回/写入的是“新 list” kind, sel = last_selector if kind == "idx": if not isinstance(parent, list): raise TypeError(f"父节点应为 list,但为 {type(parent).__name__}") parent[sel] = new_list else: if not isinstance(parent, dict): raise TypeError(f"父节点应为 dict,但为 {type(parent).__name__}") parent[sel] = new_list return new_list # 不替换则返回读取值 return cur ###########################线路特征段################################# # projectData => 线路特征段 def extract_title_from_path(path: str) -> str: import os import re """ 从文件路径中提取目标段(下划线后、{数字} 和 .xml 之前的部分)。 例如: '.../线路特征_土质比例{1272}.xml' -> '土质比例' """ name = os.path.basename(path) stem, ext = os.path.splitext(name) m = re.compile(r"^[^_]*_(.+?)(?:\{\d+\})?$").match(stem) if not m: raise ValueError(f"无法从文件名提取:{name}") return m.group(1).strip() # 修复转换后xml的格式问题 def process_xml_file(input_file_path, output_file_path=None): import re import xml.etree.ElementTree as ET import os """ 处理XML文件中的不规范内容,对XML解析错误有更好的容错处理 参数: input_file_path: 输入XML文件路径 output_file_path: 输出XML文件路径,如果为None则自动生成 返回: output_file_path: 输出文件路径 """ def process_text_content(text): """处理文本内容""" if not text: return text # 1. 将所有":"转为"_" processed = text.replace(":", "_") # 2. 处理类似(≥/≤/>/<******)格式 pattern1 = r'((≥|≤|>|<)(.*?))' def replace_special_symbols(match): symbol = match.group(1) content = match.group(2) symbol_mapping = { '≥': '大于等于', '≤': '小于等于', '>': '大于', '<': '小于' } return f'_{symbol_mapping.get(symbol, symbol)}{content}' processed = re.sub(pattern1, replace_special_symbols, processed) # 3. 处理类似A(*)转为A_* pattern2 = r'([^()\s]+)(([^)\s]+))' processed = re.sub(pattern2, r'\1_\2', processed) return processed def process_xml_element(element): """递归处理XML元素""" # 处理元素的文本内容 if element.text: element.text = process_text_content(element.text) # 处理元素的属性 for attr_name, attr_value in element.attrib.items(): element.set(attr_name, process_text_content(attr_value)) # 处理子元素 for child in element: process_xml_element(child) # 处理子元素的尾部文本 if child.tail: child.tail = process_text_content(child.tail) try: print("----------清洗文件----------") # 设置输出文件路径 if output_file_path is None: file_dir = os.path.dirname(input_file_path) file_name = os.path.basename(input_file_path) name_without_ext = os.path.splitext(file_name)[0] output_file_path = os.path.join(file_dir, f"{name_without_ext}_processed.xml") # 尝试直接解析XML文件 try: tree = ET.parse(input_file_path) root = tree.getroot() # 处理整个XML树 process_xml_element(root) # 保存处理后的XML tree.write(output_file_path, encoding='utf-8', xml_declaration=True) except ET.ParseError as e: print(f"XML解析错误: {e}") print("尝试使用替代方法处理文件...") # 替代方法:逐行读取并处理文件内容 with open(input_file_path, 'r', encoding='utf-8') as f: content = f.read() # 处理整个文件内容 processed_content = process_text_content(content) # 保存处理后的内容 with open(output_file_path, 'w', encoding='utf-8') as f: f.write(processed_content) print("使用文本模式处理完成,但可能无法完全保持XML结构") print(f"XML文件处理完成!") print(f"输入文件: {input_file_path}") print(f"输出文件: {output_file_path}") print("----------清洗结束----------") return output_file_path except Exception as e: raise Exception(f"处理文件时发生错误: {e}") ###########################工程属性################################# # projectData => projectInfo def xml_to_dict(xml_path: str): import xml.etree.ElementTree as ET """ 将指定路径的XML文件解析为字典 Args: xml_path: XML文件的路径 Returns: 包含XML中数据的字典,格式为 {名称: 值} """ try: # 解析XML文件 tree = ET.parse(xml_path) root = tree.getroot() result_dict = {} # 查找所有的tr元素 for tr in root.findall('.//tr'): # 获取名称属性 name = tr.get('名称') # 获取值属性,如果没有值属性则返回None value = tr.get('值') if name is not None: result_dict[name] = value return result_dict except ET.ParseError as e: raise ValueError(f"XML解析错误: {e}") except FileNotFoundError: raise FileNotFoundError(f"文件未找到: {xml_path}") except Exception as e: raise RuntimeError(f"处理XML文件时发生错误: {e}")