import os import random import time from typing import List, Optional, Dict from threading import Lock import requests # 4090dify 中用的硅基流动 apikey # sk-iuyfpcewztavgnivrllwnegffvrrsyeiuwvjrabngtejsqwy # sk-skynrwfqvipknbcvjsxkhjaqlivmocpkdppkocjndbyulado # sk-gjeanmnxtfcqezqagixyexarlxeztkazbrsciqescrfxrgpw # sk-uapywdmjaylwwyufaivraqwbpqtxjpbsbkltlrmwqftvfech # sk-dwmxnhaeephbxgsfncbonyajubuhuyhsfqwfsxahlepkiwas # sk-lnxedlpzufrurrmvylugpccnppwyqdccgeiicoijrqnslcgm # sk-duccaryfxcrpvwrbwvjbuwjwazyqleyebumhvrutksuqbxug # sk-njcrhxpvevtxkzbmhkxshxcpwpnjzmccjgfdykdncaxjicez # sk-bdagppigfxexcofiossccywvcqggbpywjapkdbtqycbgvqpz # sk-dvbaktabkdwdpjgxyoozlwnejosjyhdgqwllfeborqahndxs class APIKeyManager: """ API密钥管理器,用于解析环境变量中的多个API密钥并提供获取接口 支持密钥轮转使用 """ # 类变量,用于保存单例实例 _instance = None _lock = Lock() # 密钥使用计数和上次使用时间 _key_usage: Dict[str, Dict] = {} # 当前正在使用的密钥索引 _current_index = 0 api_file_path = "api_key.txt" @classmethod def get_instance(cls, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"): """ 获取单例实例 Args: env_var_name: 环境变量名称,默认为'OPENAI_API_KEY' separator: 密钥分隔符,默认为分号 Returns: APIKeyManager实例 """ if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = cls(env_var_name, separator) return cls._instance @classmethod def get_api_key(cls) -> Optional[str]: """ 静态方法:获取一个API密钥,使用轮转策略 Returns: API密钥,如果没有可用的密钥则返回None """ instance = cls.get_instance() return instance._get_next_api_key() @classmethod def get_random_api_key(cls) -> Optional[str]: """ 静态方法:随机获取一个API密钥 Returns: API密钥,如果没有可用的密钥则返回None """ instance = cls.get_instance() return instance._get_random_api_key() @classmethod def get_valid_api_keys(cls) -> List[str]: """ 静态方法:获取有效的API密钥列表 Returns: """ # 验证每一个apikey是否有效,无效则删除并打印日志。地址https://api.siliconflow.cn/v1/ import requests import logging valid_api_keys = [] url = "https://api.siliconflow.cn/v1/chat/completions" headers_template = { "Content-Type": "application/json" } data = { "model": "deepseek-ai/DeepSeek-V3", "messages": [ {"role": "user", "content": "ping"} ], "max_tokens": 1 } instance = cls.get_instance() for key in instance.api_keys: headers = headers_template.copy() headers["Authorization"] = f"Bearer {key}" try: resp = requests.post(url, headers=headers, json=data, timeout=8) if resp.status_code == 200: valid_api_keys.append(key) else: logging.warning(f"API密钥无效(被移除): {key}, 状态码: {resp.status_code}, 响应: {resp.text}") except Exception as e: logging.warning(f"API密钥验证异常(被移除): {key}, 错误: {e}") return valid_api_keys @classmethod def count(cls) -> int: """ 静态方法:获取API密钥数量 Returns: API密钥数量 """ instance = cls.get_instance() return len(instance.api_keys) @classmethod def get_key_usage_stats(cls, key: str) -> Dict: """ 静态方法:获取API密钥使用统计信息 Returns: API密钥使用统计信息 """ url = "https://api.siliconflow.cn/v1/user/info" headers = {"Authorization": f"Bearer {key}"} response = requests.request("GET", url, headers=headers) return response.json() @classmethod def remove_invalid_api_keys(cls, api_keys: List[str]): """ 移除无效的API密钥 """ instance = cls.get_instance() for key in api_keys: instance.api_keys.remove(key) @classmethod def save_api_keys(cls): """ 保存API密钥到文件 """ instance = cls.get_instance() with open(cls.api_file_path, "w") as f: f.write("\n".join(instance.api_keys)) def __init__(self, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"): """ 初始化API密钥管理器 Args: env_var_name: 环境变量名称,默认为'OPENAI_API_KEY' separator: 密钥分隔符,默认为分号 """ self.env_var_name = env_var_name self.separator = separator self.api_keys = self._load_api_keys() # 初始化密钥使用统计 for key in self.api_keys: if key not in self._key_usage: self._key_usage[key] = { "count": 0, "last_used": 0 } def _load_api_keys(self) -> List[str]: """ 从环境变量加载API密钥 Returns: API密钥列表 """ # 从文件中读取api_key with open(self.api_file_path, "r") as f: api_keys = f.readlines() # 移除空白字符 api_keys = [key.strip() for key in api_keys if key.strip()] return api_keys def _get_next_api_key(self) -> Optional[str]: """ 获取下一个API密钥,使用轮转策略 Returns: API密钥,如果没有可用的密钥则返回None """ if not self.api_keys: return None with self._lock: # 轮转到下一个密钥 self._current_index = (self._current_index + 1) % len(self.api_keys) selected_key = self.api_keys[self._current_index] # 更新使用统计 self._key_usage[selected_key]["count"] += 1 self._key_usage[selected_key]["last_used"] = time.time() return selected_key def _get_random_api_key(self) -> Optional[str]: """ 随机获取一个API密钥 Returns: API密钥,如果没有可用的密钥则返回None """ if not self.api_keys: return None with self._lock: selected_key = random.choice(self.api_keys) # 更新使用统计 self._key_usage[selected_key]["count"] += 1 self._key_usage[selected_key]["last_used"] = time.time() return selected_key def get_all_api_keys(self) -> List[str]: """ 获取所有API密钥 Returns: API密钥列表 """ return self.api_keys.copy() def is_valid(self) -> bool: """ 检查是否有可用的API密钥 Returns: 如果有可用的API密钥则返回True,否则返回False """ return len(self.api_keys) > 0 def get_usage_stats(self) -> Dict: """ 获取密钥使用统计信息 Returns: 密钥使用统计信息 """ return self._key_usage.copy() # 使用示例 if __name__ == "__main__": # 查看总密钥数 print(f"总共有 {APIKeyManager.count()} 个API密钥") # 获取实例并查看使用统计 instance = APIKeyManager.get_instance() stats = instance.get_usage_stats() all_balance=0.0 buy_balance=16 * 10 * 14 # 购买16次,一次10条api_key,每个api_key有14元 invalid_api_keys = [] for key, data in stats.items(): usage_stats = APIKeyManager.get_key_usage_stats(key) all_balance+=float(usage_stats['data']['balance']) print(f"api key:{key}---赠送余额:{usage_stats['data']['balance']}元") if float(usage_stats['data']['balance']) == 0: invalid_api_keys.append(key) print(f"剩余总赠送余额:{all_balance}元,累计消耗:{buy_balance-all_balance}元") print(f"无效的API密钥:{invalid_api_keys}") print(f"开始移除无效的API密钥,并重新保存") APIKeyManager.remove_invalid_api_keys(invalid_api_keys) APIKeyManager.save_api_keys() print(f"移除无效的API密钥,并重新保存完成")