import os import random from sqlalchemy import create_engine, MetaData, Table, select, func from sqlalchemy.orm import sessionmaker from dotenv import load_dotenv load_dotenv() def generate_questions(file_path, num_questions_per_table=10): engine = create_engine(os.getenv("SQL_DATABASE_URL", "")) metadata = MetaData() metadata.reflect(bind=engine) # 定义表名及其对应的列索引和问题模板 tables_info = { "ProjectProperties": (0, "Attribute_Value", "{name_value}的属性值是多少?"), "OtherFee": (0, "Amount", "{name_value}的金额是多少?"), "FeeCollectionTable": (0, "Rate", "{name_value}的费率是多少?"), "ProjectDivision": (0, "Total_Price", "{name_value}的合价是多少?"), "ProjectDivisions_CostPreview": (0, "Direct_Fee", "{name_value}的直接费是多少?"), "TotalCalculateTable": (0, "Amount", "{name_value}的金额是多少?"), "ProjectQuantities": (0, "Code", "{name_value}的编码是多少?") } questions = [] for table_name, (name_index, value_column, question_template) in tables_info.items(): # 加载这张表 table = Table(table_name, metadata, autoload_with=engine) # 创建会话 Session = sessionmaker(bind=engine) session = Session() # 获取列名 name_column = table.columns.keys()[name_index] # 对于每个表生成num_questions_per_table个问题 for _ in range(num_questions_per_table): # 查询表中的随机一行,并获取名称列的值 row = session.query(table).order_by(func.random()).first() name_value = getattr(row, name_column) # 构造问题 question = question_template.format(name_value=name_value) questions.append(question) # 写入文件 with open(file_path, 'w', encoding='utf-8') as file: for question in questions: file.write(question + '\n') if __name__ == "__main__": questions_file_path = "/home/bw/ctr/zjdataai-app/backend/test1/questions.txt" generate_questions(questions_file_path)