CloudLens 采用插件化、模块化的分层架构设计,提供 CLI 命令行工具 和 Web 可视化界面 两种使用方式,确保高扩展性、高可维护性和高安全性。
┌─────────────────────────────────────────────────────────┐
│ 用户交互层 (User Interface Layer) │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ CLI (Click) │ │ Web (Next.js) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
└───────────┼─────────────────────────────┼──────────────┘
│ │
┌───────────▼─────────────────────────────▼──────────────┐
│ API 层 (API Layer) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ FastAPI RESTful API │ │
│ └────────────────────┬─────────────────────────────┘ │
└───────────────────────┼────────────────────────────────┘
│
┌───────────────────────▼────────────────────────────────┐
│ 应用逻辑层 (Application Logic Layer) │
│ ├─ Analyzer (Idle, Cost, Tag, Discount...) │
│ ├─ Report Generator │
│ ├─ Dashboard Manager │
│ ├─ Budget Manager │
│ ├─ Alert Manager │
│ └─ Filter Engine │
├────────────────────────────────────────────────────────┤
│ 数据存储层 (Data Storage Layer) │
│ ├─ Database Abstraction (MySQL/SQLite) │
│ ├─ Cache Manager │
│ ├─ Bill Storage │
│ └─ Config Manager │
├────────────────────────────────────────────────────────┤
│ 云厂商抽象层 (Provider Abstraction Layer) │
│ ├─ BaseProvider (Interface) │
│ ├─ AliyunProvider │
│ ├─ TencentProvider │
│ └─ ...Extensible │
├────────────────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure Layer) │
│ ├─ PermissionGuard │
│ ├─ ConcurrentHelper │
│ ├─ SecurityCompliance │
│ └─ Error Handler │
├────────────────────────────────────────────────────────┤
│ 外部依赖层 (External Dependencies) │
│ ├─ Aliyun SDK │
│ ├─ Tencent SDK │
│ ├─ MySQL Database │
│ └─ System Keyring │
└────────────────────────────────────────────────────────┘
class BaseProvider(ABC):
"""云厂商抽象基类"""
@abstractmethod
def list_instances(self) -> List[UnifiedResource]:
"""查询计算实例"""
pass
@abstractmethod
def list_rds(self) -> List[UnifiedResource]:
"""查询数据库实例"""
pass
@abstractmethod
def check_permissions(self) -> Dict:
"""检查权限"""
pass
设计模式:抽象工厂模式
优势:
核心职责:
关键实现:
class AliyunProvider(BaseProvider):
def __init__(self, config: AccountConfig):
self.access_key = config.access_key
self.secret_key = config.secret_key
self.region = config.region
self.provider_name = "aliyun"
def list_instances(self) -> List[UnifiedResource]:
# 调用Aliyun SDK
client = self._get_client(self.region)
request = DescribeInstancesRequest()
response = self._do_request(client, request)
# 转换为统一模型
resources = []
for inst in response.get('Instances', {}).get('Instance', []):
r = UnifiedResource(
id=inst['InstanceId'],
name=inst.get('InstanceName'),
provider=self.provider_name,
# ...更多字段
)
resources.append(r)
return resources
@dataclass
class UnifiedResource:
"""统一资源模型"""
id: str # 资源ID
name: str # 资源名称
provider: str # 云厂商
region: str # 区域
zone: str # 可用区
resource_type: ResourceType # 资源类型
status: ResourceStatus # 状态
spec: str # 规格
charge_type: str # 计费方式
public_ips: List[str] # 公网IP
private_ips: List[str] # 私网IP
vpc_id: Optional[str] # VPC ID
tags: Optional[Dict] # 标签
created_time: Optional[datetime] # 创建时间
expired_time: Optional[datetime] # 到期时间
raw_data: Optional[Dict] # 原始数据
设计原则:
存储结构:
{
"accounts": [
{
"provider": "aliyun",
"name": "prod",
"region": "cn-hangzhou",
"access_key": "LTAI...",
"use_keyring": true
}
]
}
安全设计:
access_key_secret 强制存储在系统Keyringcloudlens_cli:{provider}:{name}关键方法:
class ConfigManager:
def add_account(self, config: AccountConfig):
"""添加账号配置"""
# 1. 保存AccessKey到Keyring
keyring.set_password(
"cloudlens_cli",
f"{config.provider}:{config.name}",
config.access_key_secret
)
# 2. 保存配置到文件(不含密钥)
self._save_config()
def get_account(self, name: str, provider: str = None):
"""获取账号配置"""
# 从文件读取配置
account = self._find_account(name, provider)
# 从Keyring读取密钥
account.access_key_secret = keyring.get_password(...)
return account
检测逻辑:
class IdleDetector:
@staticmethod
def is_ecs_idle(metrics: Dict) -> Tuple[bool, List[str]]:
reasons = []
# 1. CPU使用率检查
if metrics['cpu_avg'] < 5.0:
reasons.append(f"CPU平均使用率仅{metrics['cpu_avg']:.1f}%")
# 2. 内存使用率检查
if metrics['memory_avg'] < 20.0:
reasons.append(f"内存平均使用率仅{metrics['memory_avg']:.1f}%")
# 3. 网络流量检查
if metrics['net_in_avg'] < 1000: # 1KB/s
reasons.append("公网入流量极低")
# 4. 磁盘IO检查
if metrics['disk_iops_avg'] < 100:
reasons.append("磁盘IOPS极低")
is_idle = len(reasons) >= 2 # 至少2个指标满足才判定为闲置
return is_idle, reasons
数据来源:
语法解析:
class FilterEngine:
@staticmethod
def parse_filter(filter_str: str) -> List[tuple]:
"""
解析筛选表达式
支持:key=value, key>value, key<value
连接:AND, OR
示例:
"charge_type=PrePaid AND expire_days<7"
-> [('charge_type', '=', 'PrePaid', 'AND'),
('expire_days', '<', 7, 'AND')]
"""
# 正则表达式解析
parts = re.split(r'\s+(AND|OR)\s+', filter_str)
conditions = []
for part in parts:
match = re.match(r'(\w+)\s*(!=|<=|>=|=|<|>)\s*(.+)', part)
if match:
field, operator, value = match.groups()
conditions.append((field, operator, value, logic))
return conditions
应用筛选:
def apply_filter(resources: List, filter_str: str) -> List:
conditions = parse_filter(filter_str)
result = []
for resource in resources:
if _match_resource(resource, conditions):
result.append(resource)
return result
设计方案:ThreadPoolExecutor(而非AsyncIO)
原因:
实现:
class ConcurrentQueryHelper:
@staticmethod
def query_with_progress(accounts, query_func, progress_callback):
"""并发查询带进度反馈"""
all_results = []
total = len(accounts)
with ThreadPoolExecutor(max_workers=min(total, 10)) as executor:
# 提交所有任务
futures = {executor.submit(query_func, acc): acc
for acc in accounts}
# 等待完成
for future in as_completed(futures):
try:
result = future.result()
all_results.extend(result)
except Exception as e:
logger.error(f"Query failed: {e}")
finally:
completed += 1
if progress_callback:
progress_callback(completed, total)
return all_results
性能提升:
双重保障:
class PermissionGuard:
DANGEROUS_ACTIONS = [
"DeleteInstance",
"ModifyInstance",
"CreateInstance",
# ...更多
]
@staticmethod
def check_action(action: str) -> bool:
"""检查操作是否安全"""
for dangerous in DANGEROUS_ACTIONS:
if dangerous in action:
raise SecurityError(f"禁止操作: {action}")
return True
def check_permissions(self) -> Dict:
"""检查当前账号权限"""
result = {
"permissions": [],
"high_risk_permissions": []
}
# 检查只读权限
for api, desc in READ_ONLY_APIS:
result["permissions"].append({
"api": api,
"description": desc,
"risk_level": "LOW"
})
# 检查高危权限(通过Policy分析)
for policy in ["AdministratorAccess", "FullAccess"]:
result["high_risk_permissions"].append({
"policy": policy,
"risk_level": "HIGH",
"recommendation": "建议使用只读策略"
})
return result
用户输入
↓
CLI解析(Click)
↓
resolve_account_name() → 识别账号(支持重名)
↓
get_provider() → 创建Provider实例
↓
[串行 OR 并发]
↓
Provider.list_instances() → 调用云SDK
↓
_do_request() → 发送API请求
↓
parse_response() → 解析响应
↓
to_unified_resource() → 转换为统一模型
↓
apply_filter() → 应用筛选条件
↓
export() → 导出为指定格式
↓
输出到终端/文件
用户触发报告生成
↓
收集数据
├─ list_instances()
├─ list_rds()
├─ list_redis()
└─ (可选) analyze_idle()
↓
ReportGenerator.generate_excel()
├─ 创建Workbook
├─ 生成Summary Sheet
├─ 生成ECS Sheet
├─ 生成RDS Sheet
├─ 生成Idle Sheet
├─ 应用样式
└─ 保存文件
↓
输出报告文件
应用场景:Provider创建
def get_provider(account: AccountConfig) -> BaseProvider:
"""Provider工厂方法"""
if account.provider == "aliyun":
return AliyunProvider(account)
elif account.provider == "tencent":
return TencentProvider(account)
else:
raise ValueError(f"Unsupported provider: {account.provider}")
应用场景:分析器(Analyzer)
# 不同的分析策略
IdleDetector.analyze() # 闲置分析策略
CostAnalyzer.analyze() # 成本分析策略
TagAnalyzer.analyze() # 标签分析策略
SecurityAnalyzer.analyze() # 安全分析策略
应用场景:云SDK适配
class AliyunProvider(BaseProvider):
"""将Aliyun SDK适配到统一接口"""
def list_instances(self):
# Aliyun特定的API调用
response = aliyun_sdk.describe_instances()
# 适配为统一模型
return [self._adapt_instance(inst)
for inst in response]
def _adapt_instance(self, aliyun_inst):
"""适配器方法"""
return UnifiedResource(
id=aliyun_inst['InstanceId'],
name=aliyun_inst.get('InstanceName'),
# ...字段映射
)
应用场景:ConfigManager
class ConfigManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._load_config()
return cls._instance
三重保障:
机制:
# providers/aws/provider.py
class AWSProvider(BaseProvider):
def list_instances(self):
# 调用AWS SDK
pass
# main_cli.py
def get_provider(account):
if account.provider == "aws":
return AWSProvider(account)
def list_mongodb(self) -> List[UnifiedResource]:
# 实现查询逻辑
pass
@query.command("mongodb")
def query_mongodb(account):
provider.list_mongodb()
# core/compliance_analyzer.py
class ComplianceAnalyzer:
@staticmethod
def check_compliance(resources):
# 分析逻辑
pass
@analyze.command("compliance")
def analyze_compliance():
ComplianceAnalyzer.check_compliance()
try:
# Provider层
response = sdk_client.describe_instances()
except SDKException as e:
# SDK错误 → 重试或告警
logger.error(f"SDK error: {e}")
except NetworkException as e:
# 网络错误 → 重试
retry(...)
except Exception as e:
# 未知错误 → 记录并继续
logger.exception(e)
continue
❌ Failed to query account 'prod':
Reason: InvalidAccessKeyId
Suggestion: Please check your Access Key in config
def test_filter_engine():
resources = [...]
result = FilterEngine.apply_filter(
resources,
"charge_type=PrePaid AND expire_days<7"
)
assert len(result) == expected_count
def test_aliyun_provider():
provider = AliyunProvider(test_config)
instances = provider.list_instances()
assert len(instances) > 0
assert instances[0].provider == "aliyun"
CloudLens CLI的架构设计遵循以下原则:
适合团队:运维团队、DevOps工程师、云架构师
技术栈:Python 3.8+, Click, 云厂商SDK, Keyring