制定日期: 2026-01-08 版本: 3.0.0 规划周期: 2026年全年
cloudlens/ 顶级包,消除了循环依赖。aliyunidle 到 CloudLens 的全面转型,包括文档、Web UI 及仓库元数据。| 能力域 | 成熟度 | 核心功能 | 2026 重点演进 |
|---|---|---|---|
| 多云管理 | ⭐⭐⭐⭐⭐ | 阿里云/腾讯云统一管理 | AWS/Azure 生产级支持 |
| 成本分析 | ⭐⭐⭐⭐ | 趋势/预测/折扣分析 | 深度账单归因 (Unit Economics) |
| 安全合规 | ⭐⭐⭐⭐⭐ | CIS 检查、漏扫、架构审计 | Auto-Remediation (自动修复) |
| 智脑引擎 (AI) | ⭐⭐ | 线性预测、规则建议 | 大模型集成 (LLM Ops-Copilot) |
| 可观测性 | ⭐⭐⭐ | 结构化日志、Web 仪表盘 | Metrics/Tracing 实时流处理 |
| 债务类型 | 影响范围 | 紧急程度 | 工作量估算 |
|---|---|---|---|
| API单文件过大(3922行) | 开发效率、代码审查 | 🔴 高 | 5人日 |
| 测试覆盖不足 | 代码质量、回归风险 | 🟠 中 | 15人日 |
| 监控缺失 | 运维效率、问题定位 | 🔴 高 | 8人日 |
| 文档不完整 | 新人上手、维护成本 | 🟡 低 | 10人日 |
| 缓存层单一 | 性能瓶颈 | 🟠 中 | 6人日 |
企业级多云资源治理与成本优化平台
目标: 提升用户操作效率40%,降低手动刷新频次
功能设计:
实时刷新控制面板
├── 自动刷新开关
│ ├── 30秒刷新 (高频监控场景)
│ ├── 1分钟刷新 (日常监控)
│ ├── 5分钟刷新 (长期观察)
│ └── 手动刷新
├── WebSocket推送
│ ├── 成本异常事件推送
│ ├── 安全告警实时推送
│ └── 资源状态变更推送
└── 后台静默更新
├── 增量数据更新
├── 差异高亮显示
└── 用户无感知刷新
技术方案:
实施步骤:
预期收益:
当前问题:
优化方案:
2.1 对比模式:
// 对比视图枚举
enum ComparisonMode {
MONTH_OVER_MONTH = "mom", // 环比 (当月 vs 上月)
YEAR_OVER_YEAR = "yoy", // 同比 (今年 vs 去年)
QUARTER_OVER_QUARTER = "qoq", // 季度对比
MULTI_ACCOUNT = "multi" // 多账号对比
}
2.2 数据钻取:
2.3 智能洞察:
💡 智能洞察面板
├── 成本变化原因分析
│ ├── "本月成本上涨23%,主要由于ECS实例增加15台"
│ ├── "折扣率下降5%,建议续费优化"
│ └── "流量费用异常增长,建议检查CDN配置"
├── 预测与建议
│ ├── "按当前趋势,本月将超预算12%"
│ └── "建议在15号前执行成本优化动作"
└── 对比数据
├── "同比去年同期增长18%"
└── "环比上月下降5%"
UI设计:
[图表工具栏]
[月度▼] [环比○ 同比○ 多账号○] [导出▼] [全屏⛶]
[折线图/柱状图区域]
┌────────────────────────────────┐
│ 成本趋势 (2025-01 ~ 2026-01) │
│ │
│ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ Jan Feb Mar Apr May Jun Jul │
│ │
│ 💡 本月成本环比↑23% │
└────────────────────────────────┘
[数据表格] (可钻取)
3.1 一键导出:
导出按钮菜单
├── 导出当前视图 (CSV)
├── 导出当前视图 (Excel)
├── 导出详细报告 (HTML)
├── 导出API数据 (JSON)
└── 自定义导出
├── 选择字段
├── 选择时间范围
└── 选择格式
3.2 批量操作工具栏:
[已选择 15 个实例] [取消选择]
┌──────────────────────────────────┐
│ [批量打标签] [批量停机] [批量续费] │
│ [批量修改规格] [批量删除] [更多▼] │
└──────────────────────────────────┘
3.3 操作历史与回滚:
设计原则:
移动端专用组件:
// 移动端适配Hook
const { isMobile, isTablet } = useResponsive()
// 移动端简化卡片
<MobileCostCard
title="本月成本"
value={12345}
trend="+23%"
sparkline={data}
/>
// 移动端抽屉菜单
<MobileDrawer>
<NavMenu />
</MobileDrawer>
触摸手势:
5.1 分析维度矩阵:
| 维度 | 当前状态 | 目标状态 | 示例查询 |
|---|---|---|---|
| 按产品 | ✅ 已实现 | 增强 | “ECS本月成本¥12,345” |
| 按区域 | ⚠️ 部分实现 | 完善 | “华东1区成本占比45%” |
| 按标签 | ✅ 已实现 | 增强 | “项目A成本¥8,000” |
| 按部门 | ❌ 未实现 | 新增 | “研发部门占比60%” |
| 按环境 | ❌ 未实现 | 新增 | “生产环境 vs 测试环境” |
| 按实例 | ✅ 已实现 | 增强 | “Top 10 高成本实例” |
| 按时间 | ✅ 已实现 | 增强 | “工作日 vs 周末” |
5.2 成本分配引擎:
# 成本分配规则示例
class CostAllocationRule:
"""
成本分配规则定义
"""
def __init__(self, rule_config):
self.dimensions = rule_config['dimensions'] # ['department', 'project', 'env']
self.allocation_method = rule_config['method'] # 'tag', 'equal', 'proportional'
self.fallback_rule = rule_config['fallback'] # 'shared_pool', 'unallocated'
def allocate_cost(self, resource, cost):
"""
根据规则分配成本
"""
# 按标签分配
if self.allocation_method == 'tag':
return self._allocate_by_tags(resource, cost)
# 按比例分配
elif self.allocation_method == 'proportional':
return self._allocate_proportional(resource, cost)
# 平均分配
elif self.allocation_method == 'equal':
return self._allocate_equally(resource, cost)
# 使用示例
allocator = CostAllocator(rules=[
{
"dimensions": ["department", "project"],
"method": "tag",
"fallback": "shared_pool"
}
])
allocated_costs = allocator.allocate_all_resources(resources, costs)
5.3 成本归因分析:
成本归因报告
├── 直接成本 (75%)
│ ├── ECS: ¥10,000 (部门A: 60%, 部门B: 40%)
│ ├── RDS: ¥5,000 (项目X: 100%)
│ └── OSS: ¥2,000 (按存储量分配)
├── 共享成本 (20%)
│ ├── VPC: ¥1,500 (按流量比例分配)
│ ├── SLB: ¥1,000 (按实例数分配)
│ └── DNS: ¥500 (平均分配)
└── 未分配成本 (5%)
└── 缺少标签的资源: ¥800
6.1 异常检测算法:
class AnomalyDetector:
"""
多算法融合的异常检测器
"""
def __init__(self):
self.detectors = [
StatisticalDetector(), # 统计方法 (3σ原则)
TimeSeriesDetector(), # 时序分析 (Prophet)
MachineLearningDetector() # ML方法 (Isolation Forest)
]
def detect_cost_anomaly(self, cost_history):
"""
检测成本异常
"""
anomalies = []
for detector in self.detectors:
result = detector.detect(cost_history)
anomalies.extend(result)
# 多算法投票机制
confirmed_anomalies = self._consensus_voting(anomalies)
return self._rank_by_severity(confirmed_anomalies)
def _consensus_voting(self, anomalies):
"""
多算法共识投票
如果3个算法中至少2个检测到异常,则确认为异常
"""
anomaly_votes = defaultdict(int)
for anomaly in anomalies:
anomaly_votes[anomaly.timestamp] += 1
return [a for a in anomalies if anomaly_votes[a.timestamp] >= 2]
6.2 异常类型定义:
| 异常类型 | 检测逻辑 | 严重程度 | 处理建议 |
|---|---|---|---|
| 成本突增 | 单日成本超过30天均值2倍 | 🔴 高 | 立即排查新增资源 |
| 异常账单 | 单个账单项超过历史最大值150% | 🟠 中 | 确认计费规则变更 |
| 折扣异常 | 折扣率低于历史均值20% | 🟡 低 | 检查合同到期情况 |
| 流量异常 | 流量费用异常增长 | 🔴 高 | 检查是否被攻击/盗刷 |
| 闲置资源增加 | 闲置资源数量增加50% | 🟠 中 | 执行资源清理 |
| 预算超支风险 | 当前消耗速率将导致月底超支 | 🟠 中 | 立即执行成本控制 |
6.3 智能告警路由:
# 告警路由配置
alert_routing:
- name: "成本突增告警"
conditions:
- type: "cost_spike"
severity: ["critical", "high"]
channels:
- type: "webhook"
url: "https://hooks.slack.com/..."
- type: "email"
recipients: ["finops@company.com"]
- type: "sms"
phones: ["+86138****"]
throttling:
interval: "1h" # 1小时内相同告警只发送一次
- name: "安全告警"
conditions:
- type: "security_issue"
severity: ["critical"]
channels:
- type: "pagerduty"
integration_key: "xxx"
- type: "webhook"
url: "https://hooks.slack.com/security"
escalation:
- level: 1
delay: "5m"
channels: ["oncall_engineer"]
- level: 2
delay: "15m"
channels: ["security_manager"]
7.1 多模型对比:
class EnhancedCostPredictor:
"""
增强版成本预测器,支持多模型对比
"""
def __init__(self):
self.models = {
'prophet': ProphetModel(), # Facebook Prophet (时序)
'lstm': LSTMModel(), # LSTM神经网络
'linear': LinearModel(), # 线性回归 (基准)
'arima': ARIMAModel(), # ARIMA (传统时序)
'ensemble': EnsembleModel() # 集成模型
}
def predict_with_confidence(self, history, days=90):
"""
预测未来成本,包含置信区间
"""
predictions = {}
for name, model in self.models.items():
result = model.predict(history, days)
predictions[name] = {
'forecast': result.forecast,
'lower_bound': result.lower_bound, # 95%置信区间下界
'upper_bound': result.upper_bound, # 95%置信区间上界
'accuracy': result.accuracy_score,
'mape': result.mape # 平均绝对百分比误差
}
# 选择最佳模型
best_model = self._select_best_model(predictions)
return {
'recommended_model': best_model,
'all_predictions': predictions,
'ensemble_prediction': self._ensemble_predict(predictions)
}
7.2 场景预测:
场景预测工具
├── 基准场景 (当前趋势)
│ └── "按当前趋势,3个月后成本为¥45,000"
├── 优化场景 (执行建议后)
│ └── "执行优化建议后,预计节省¥8,000 (18%)"
├── 扩容场景 (资源增长)
│ ├── "增加20%资源,成本预计¥52,000"
│ └── "建议采用预留实例,可节省¥3,000"
└── What-if分析
├── "如果折扣率降低10%,成本将增加¥5,000"
└── "如果迁移到按量付费,成本将增加¥12,000"
7.3 预测可视化:
[成本预测图表]
┌────────────────────────────────────┐
│ 成本预测 (未来90天) │
│ │
│ ↑ ╱━━━━ 上界 │
│ │ ╱━━━ │
│ │ ╱━━━ 预测值 │
│ │ ╱━━━ │
│ │ ╱━━━━━━━━━━━━━━ 下界 │
│ │━━━━ 历史数据 │
│ └────────┬───────┬───────┬─────→ │
│ 过去30天 今天 未来60天 │
│ │
│ 📊 模型准确度: 92% │
│ 💡 建议: 采用Prophet模型 │
└────────────────────────────────────┘
8.1 用户认证系统:
# 用户模型
class User:
id: int
username: str
email: str
password_hash: str
role: Role # Admin, Operator, Viewer
department: str
created_at: datetime
last_login: datetime
mfa_enabled: bool
mfa_secret: Optional[str]
# 认证流程
class AuthService:
def login(self, username, password, mfa_code=None):
# 1. 验证用户名密码
user = self._verify_credentials(username, password)
# 2. MFA验证 (如果启用)
if user.mfa_enabled:
if not self._verify_mfa(user, mfa_code):
raise MFARequiredError()
# 3. 生成JWT Token
token = self._generate_jwt(user)
# 4. 记录登录日志
self._log_login(user)
return token
8.2 RBAC权限模型:
# 角色权限配置
roles:
admin:
description: "系统管理员"
permissions:
- "*:*" # 所有权限
finops_manager:
description: "FinOps经理"
permissions:
- "cost:read"
- "cost:analyze"
- "budget:*"
- "report:*"
- "account:read"
security_auditor:
description: "安全审计员"
permissions:
- "security:*"
- "compliance:*"
- "resource:read"
- "report:generate"
developer:
description: "开发人员"
permissions:
- "resource:read"
- "cost:read"
- "tag:update" # 只能更新标签
viewer:
description: "查看者"
permissions:
- "*:read" # 只读权限
# 资源级权限控制
resource_permissions:
- resource_type: "account"
resource_id: "prod-account"
user: "dev-team"
permissions: ["read"] # 开发团队只能查看生产账号
- resource_type: "account"
resource_id: "dev-account"
user: "dev-team"
permissions: ["read", "update"] # 开发账号可以修改
8.3 权限检查装饰器:
from functools import wraps
def require_permission(permission: str):
"""
权限检查装饰器
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 从请求上下文获取当前用户
current_user = get_current_user()
# 检查权限
if not current_user.has_permission(permission):
raise PermissionDeniedError(
f"User {current_user.username} lacks permission: {permission}"
)
# 记录操作审计日志
await audit_log.log_action(
user=current_user,
action=permission,
resource=args[0] if args else None
)
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@app.post("/api/accounts/{account_id}/resources/delete")
@require_permission("resource:delete")
async def delete_resource(account_id: str, resource_id: str):
# 执行删除逻辑
pass
8.4 审计日志:
-- 审计日志表
CREATE TABLE audit_logs (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
username VARCHAR(100),
action VARCHAR(100) NOT NULL, -- 'resource:delete', 'budget:update'
resource_type VARCHAR(50), -- 'ecs', 'budget', 'account'
resource_id VARCHAR(255),
old_value JSON, -- 修改前的值
new_value JSON, -- 修改后的值
ip_address VARCHAR(45),
user_agent TEXT,
status ENUM('success', 'failed'),
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_action (action),
INDEX idx_created_at (created_at)
);
9.1 通知渠道集成:
# 通知渠道抽象
class NotificationChannel(ABC):
@abstractmethod
async def send(self, message: NotificationMessage):
pass
# Slack集成
class SlackChannel(NotificationChannel):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send(self, message: NotificationMessage):
payload = {
"text": message.title,
"blocks": [
{
"type": "header",
"text": {"type": "plain_text", "text": message.title}
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": message.body}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Severity:*\n{message.severity}"},
{"type": "mrkdwn", "text": f"*Account:*\n{message.account}"}
]
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View Details"},
"url": message.link
}
]
}
]
}
async with httpx.AsyncClient() as client:
await client.post(self.webhook_url, json=payload)
# 钉钉集成
class DingTalkChannel(NotificationChannel):
def __init__(self, webhook_url: str, secret: str):
self.webhook_url = webhook_url
self.secret = secret
async def send(self, message: NotificationMessage):
# 计算签名
timestamp = str(round(time.time() * 1000))
sign = self._calculate_sign(timestamp)
payload = {
"msgtype": "markdown",
"markdown": {
"title": message.title,
"text": f"### {message.title}\n\n{message.body}\n\n"
f"**Severity:** {message.severity}\n\n"
f"[查看详情]({message.link})"
}
}
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
async with httpx.AsyncClient() as client:
await client.post(url, json=payload)
# 企业微信集成
class WeChatWorkChannel(NotificationChannel):
# 实现略...
pass
# 邮件集成
class EmailChannel(NotificationChannel):
# 实现略...
pass
9.2 Webhook支持:
# Webhook配置
class WebhookConfig:
url: str
secret: str # 用于HMAC签名验证
events: List[str] # 订阅的事件类型
retry_policy: RetryPolicy
timeout: int = 30
# Webhook事件
class WebhookEvent:
"""
Webhook事件标准格式
"""
event_id: str
event_type: str # 'cost.spike', 'security.issue', 'budget.exceeded'
timestamp: datetime
account: str
data: dict
signature: str # HMAC-SHA256签名
# Webhook发送器
class WebhookSender:
async def send_event(self, config: WebhookConfig, event: WebhookEvent):
# 1. 计算签名
event.signature = self._calculate_signature(config.secret, event)
# 2. 发送请求
try:
async with httpx.AsyncClient(timeout=config.timeout) as client:
response = await client.post(
config.url,
json=event.dict(),
headers={
"X-CloudLens-Signature": event.signature,
"X-CloudLens-Event": event.event_type
}
)
if response.status_code >= 500:
# 服务器错误,重试
await self._retry_send(config, event)
except Exception as e:
logger.error(f"Webhook send failed: {e}")
await self._retry_send(config, event)
9.3 Grafana数据源插件:
// Grafana数据源插件 (Golang)
package main
import (
"context"
"encoding/json"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
type CloudLensDataSource struct{}
func (ds *CloudLensDataSource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
response := backend.NewQueryDataResponse()
for _, q := range req.Queries {
var query CloudLensQuery
json.Unmarshal(q.JSON, &query)
// 调用CloudLens API获取数据
data, err := ds.fetchData(ctx, query)
if err != nil {
response.Responses[q.RefID] = backend.DataResponse{Error: err}
continue
}
// 转换为Grafana格式
frame := ds.convertToDataFrame(data)
response.Responses[q.RefID] = backend.DataResponse{Frames: []*data.Frame{frame}}
}
return response, nil
}
// plugin.json配置
{
"type": "datasource",
"name": "CloudLens",
"id": "cloudlens-datasource",
"metrics": true,
"annotations": true,
"alerting": true
}
10.1 NL2SQL引擎:
class NL2SQLEngine:
"""
自然语言转SQL查询引擎
"""
def __init__(self, llm_client):
self.llm_client = llm_client # Claude/GPT-4等
self.schema = self._load_database_schema()
self.query_templates = self._load_query_templates()
async def translate_query(self, natural_language_query: str):
"""
将自然语言查询转换为SQL
"""
# 1. 意图识别
intent = await self._classify_intent(natural_language_query)
# 2. 实体抽取
entities = await self._extract_entities(natural_language_query)
# 3. 生成SQL
prompt = f"""
数据库Schema:
{self.schema}
查询模板示例:
{self.query_templates}
用户查询: {natural_language_query}
意图: {intent}
实体: {entities}
请生成安全的SQL查询语句:
"""
sql = await self.llm_client.generate(prompt)
# 4. SQL安全检查
if not self._is_safe_sql(sql):
raise UnsafeSQLError("SQL query contains dangerous operations")
return {
'sql': sql,
'intent': intent,
'entities': entities,
'explanation': await self._explain_sql(sql)
}
# 使用示例
engine = NL2SQLEngine(claude_client)
# 用户输入
query = "本月ECS成本最高的5个实例是哪些?"
# 转换
result = await engine.translate_query(query)
print(result['sql'])
# SELECT instance_id, instance_name, SUM(cost) as total_cost
# FROM bill_items
# WHERE billing_cycle = '2026-01'
# AND product_name LIKE '%ECS%'
# GROUP BY instance_id, instance_name
# ORDER BY total_cost DESC
# LIMIT 5
# 执行SQL并返回结果
data = db.query(result['sql'])
10.2 智能问答系统:
class IntelligentQASystem:
"""
基于RAG的智能问答系统
"""
def __init__(self):
self.vector_db = ChromaDB() # 向量数据库
self.llm = ClaudeClient()
self.embedding_model = OpenAIEmbeddings()
async def answer_question(self, question: str, context: dict):
"""
回答用户问题
"""
# 1. 检索相关数据
relevant_docs = await self._retrieve_relevant_data(question, context)
# 2. 构建提示词
prompt = f"""
你是CloudLens的AI助手,专注于云成本优化和资源管理。
用户问题: {question}
相关数据:
{relevant_docs}
当前上下文:
- 账号: {context['account']}
- 时间范围: {context['time_range']}
- 当前页面: {context['page']}
请基于以上数据回答用户问题,并提供可操作的建议。
"""
# 3. 生成回答
answer = await self.llm.generate(prompt)
# 4. 生成可视化建议
visualizations = await self._suggest_visualizations(question, relevant_docs)
return {
'answer': answer,
'sources': relevant_docs,
'visualizations': visualizations,
'suggested_actions': await self._suggest_actions(question, answer)
}
# 示例对话
qa = IntelligentQASystem()
Q: "为什么本月成本比上月高了这么多?"
A: "本月成本增加了23%,主要有以下3个原因:
1. ECS实例数量增加了15台 (+¥8,000)
2. 流量费用异常增长 (+¥3,000,可能是CDN配置问题)
3. 预留实例到期,转为按量付费 (+¥2,000)
建议操作:
- 检查CDN配置,优化流量使用
- 评估新增ECS的必要性,考虑关闭闲置实例
- 续购预留实例以降低成本"
Q: "帮我找出可以优化的资源"
A: "根据分析,发现以下可优化资源:
1. 闲置ECS实例 (CPU<5%): 8台,预计节省¥2,400/月
2. 未绑定EIP: 5个,预计节省¥500/月
3. 低利用率RDS实例: 3个,建议降配,预计节省¥1,800/月
总计可节省: ¥4,700/月 (占当前成本15%)
[查看详细报告] [一键优化]"
11.1 工作流引擎:
# 工作流定义 (YAML)
workflow_example = """
name: "成本优化自动化流程"
trigger:
type: "schedule"
cron: "0 9 * * 1" # 每周一早上9点
steps:
- name: "检测闲置资源"
action: "analyze.idle_resources"
params:
threshold:
cpu: 5
memory: 10
outputs:
idle_list: ""
- name: "生成优化报告"
action: "report.generate"
params:
template: "cost_optimization"
data: ""
outputs:
report_url: ""
- name: "发送通知"
action: "notify.send"
params:
channel: "slack"
message: "本周发现个闲置资源,查看报告: "
- name: "等待审批"
action: "approval.wait"
params:
approvers: ["finops@company.com"]
timeout: "48h"
- name: "执行优化"
condition: ""
action: "remediate.batch_stop"
params:
resources: ""
dry_run: false
- name: "验证结果"
action: "analyze.verify_optimization"
params:
before_cost: ""
after_cost: ""
"""
# 工作流引擎实现
class WorkflowEngine:
def __init__(self):
self.actions = self._register_actions()
async def execute_workflow(self, workflow_def: dict):
"""
执行工作流
"""
context = {"steps": {}}
for step in workflow_def['steps']:
# 1. 检查条件
if 'condition' in step:
if not self._evaluate_condition(step['condition'], context):
logger.info(f"Skipping step {step['name']} due to condition")
continue
# 2. 渲染参数 (支持Jinja2模板)
params = self._render_params(step['params'], context)
# 3. 执行动作
action = self.actions[step['action']]
result = await action.execute(params)
# 4. 保存输出
if 'outputs' in step:
context['steps'][step['name']] = self._extract_outputs(
result, step['outputs']
)
# 5. 错误处理
if result.status == 'failed':
if step.get('on_error') == 'continue':
logger.warning(f"Step {step['name']} failed, continuing")
continue
else:
raise WorkflowExecutionError(f"Step {step['name']} failed")
return context
11.2 审批流程:
# 审批配置
class ApprovalConfig:
approvers: List[str] # 审批人列表
approval_type: str # 'any', 'all', 'sequential'
timeout: timedelta
auto_approve_conditions: Optional[dict] # 自动审批条件
# 审批流程
class ApprovalService:
async def create_approval(self, config: ApprovalConfig, data: dict):
"""
创建审批单
"""
approval = Approval(
id=generate_id(),
config=config,
data=data,
status='pending',
created_at=datetime.now()
)
# 检查自动审批条件
if config.auto_approve_conditions:
if self._check_auto_approve(approval):
approval.status = 'approved'
approval.approved_by = 'system'
return approval
# 发送审批通知
await self._notify_approvers(approval)
# 保存到数据库
await self.db.save(approval)
return approval
def _check_auto_approve(self, approval: Approval) -> bool:
"""
检查是否满足自动审批条件
示例: 成本影响 < ¥100 自动审批
"""
conditions = approval.config.auto_approve_conditions
if 'cost_impact' in conditions:
if approval.data.get('cost_impact', float('inf')) < conditions['cost_impact']:
return True
return False
| 指标类型 | 指标名称 | 当前值 | Q2目标 | Q4目标 | 计算方式 |
|---|---|---|---|---|---|
| 用户价值 | 月均成本节省金额 | - | ¥50,000 | ¥200,000 | 实际节省 / 月 |
| 用户活跃 | 周活跃用户 (WAU) | - | 100 | 500 | 周活跃用户数 |
| 功能使用 | 报告生成次数 | - | 1,000/月 | 5,000/月 | 月报告生成总数 |
| 效率提升 | 平均问题修复时间 | - | <24h | <12h | 发现→修复平均时长 |
| 系统健康 | API可用性 | - | >99.5% | >99.9% | 正常请求/总请求 |
┌─────────────────┐
│ Load Balancer │
└────────┬────────┘
│
┌────────▼────────┐
│ FastAPI App │
│ (3922行API) │
└────────┬────────┘
│
┌────────▼────────┐
│ Core Services │
│ (60+ modules) │
└────────┬────────┘
│
┌────────▼────────┐
│ MySQL Database │
└─────────────────┘
问题:
┌─────────────────┐
│ API Gateway │
│ (Kong/APISIX) │
└────────┬────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌────────▼────────┐ ┌─────▼──────┐ ┌───────▼───────┐
│ Cost Service │ │ Resource │ │ Security │
│ (FastAPI) │ │ Service │ │ Service │
└────────┬────────┘ └─────┬──────┘ └───────┬───────┘
│ │ │
│ ┌───────▼───────┐ │
│ │ Message Bus │ │
│ │ (RabbitMQ) │ │
│ └───────┬───────┘ │
│ │ │
┌────────▼────────┐ ┌─────▼──────┐ ┌───────▼───────┐
│ Cost Database │ │ Resource │ │ Security │
│ (MySQL) │ │ Database │ │ Database │
└─────────────────┘ └────────────┘ └───────────────┘
┌─────────────────┐
│ Service Mesh │
│ (Istio) │
└────────┬────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌────────▼────────┐ ┌─────▼──────┐ ┌───────▼───────┐
│ Cost Service │ │ Resource │ │ Security │
│ (k8s pods) │ │ Service │ │ Service │
└────────┬────────┘ └─────┬──────┘ └───────┬───────┘
│ │ │
│ ┌───────▼───────┐ │
│ │ Event Stream │ │
│ │ (Kafka) │ │
│ └───────┬───────┘ │
│ │ │
┌────────▼────────┐ ┌─────▼──────┐ ┌───────▼───────┐
│ Time Series DB │ │ Cache │ │ Document │
│ (InfluxDB) │ │ (Redis) │ │ Database │
└─────────────────┘ └────────────┘ └───────────────┘
┌──────────────────────────────────────────────────┐
│ Observability Platform │
│ Prometheus + Grafana + Jaeger + ELK │
└──────────────────────────────────────────────────┘
2.1 API模块化重构 ⭐⭐⭐⭐⭐
当前状态: api.py 3922行,148个端点集中在一个文件
重构方案:
# 目标结构
web/backend/
├── api/
│ ├── __init__.py # 路由注册中心
│ ├── dependencies.py # 依赖注入
│ ├── middleware.py # 中间件
│ ├── v1/ # API版本化
│ │ ├── __init__.py
│ │ ├── accounts.py # 账号管理 (~10个端点)
│ │ ├── resources.py # 资源查询 (~15个端点)
│ │ ├── costs.py # 成本分析 (~20个端点)
│ │ ├── discounts.py # 折扣分析 (~14个端点)
│ │ ├── security.py # 安全合规 (~12个端点)
│ │ ├── budgets.py # 预算管理 (~8个端点)
│ │ ├── alerts.py # 告警管理 (~10个端点)
│ │ ├── reports.py # 报告生成 (~6个端点)
│ │ ├── tags.py # 虚拟标签 (~8个端点)
│ │ ├── dashboards.py # 仪表盘 (~8个端点)
│ │ ├── optimization.py # 优化建议 (~4个端点)
│ │ ├── cost_allocation.py # 成本分配 (~6个端点)
│ │ └── ai.py # AI功能 (~5个端点)
├── models/ # Pydantic模型
│ ├── requests.py # 请求模型
│ ├── responses.py # 响应模型
│ └── schemas.py # 数据模型
├── services/ # 业务逻辑层
│ ├── account_service.py
│ ├── cost_service.py
│ ├── security_service.py
│ └── ...
├── repositories/ # 数据访问层
│ ├── account_repository.py
│ ├── bill_repository.py
│ └── ...
└── main.py # 应用入口
# 路由注册 (api/__init__.py)
from fastapi import APIRouter
from .v1 import (
accounts, resources, costs, discounts,
security, budgets, alerts, reports
)
api_router = APIRouter()
# 注册v1路由
api_v1_router = APIRouter(prefix="/api/v1")
api_v1_router.include_router(accounts.router, prefix="/accounts", tags=["accounts"])
api_v1_router.include_router(resources.router, prefix="/resources", tags=["resources"])
api_v1_router.include_router(costs.router, prefix="/costs", tags=["costs"])
# ... 其他路由
api_router.include_router(api_v1_router)
分层架构:
# Controller层 (api/v1/costs.py)
@router.get("/trends")
async def get_cost_trends(
account: str,
date_range: DateRange = Depends(),
cost_service: CostService = Depends()
):
return await cost_service.get_cost_trends(account, date_range)
# Service层 (services/cost_service.py)
class CostService:
def __init__(self, bill_repo: BillRepository):
self.bill_repo = bill_repo
async def get_cost_trends(self, account: str, date_range: DateRange):
# 业务逻辑
bills = await self.bill_repo.get_bills_in_range(account, date_range)
trends = self._calculate_trends(bills)
return CostTrendsResponse(**trends)
# Repository层 (repositories/bill_repository.py)
class BillRepository:
def __init__(self, db: Database):
self.db = db
async def get_bills_in_range(self, account: str, date_range: DateRange):
# 数据访问逻辑
return await self.db.query(
"SELECT * FROM bill_items WHERE account_id = ? AND ...",
(account,)
)
实施计划:
2.2 多级缓存架构 ⭐⭐⭐⭐
当前状态: 仅使用MySQL缓存表 (L3缓存)
目标架构:
class MultiLevelCache:
"""
三级缓存架构
L1: 进程内LRU缓存 (5分钟TTL)
L2: Redis缓存 (30分钟TTL)
L3: MySQL缓存表 (24小时TTL)
"""
def __init__(self):
self.l1_cache = LRUCache(maxsize=1000, ttl=300)
self.l2_cache = RedisCache(ttl=1800) if redis_available() else None
self.l3_cache = MySQLCache(ttl=86400)
# 缓存命中率统计
self.metrics = CacheMetrics()
async def get(self, key: str):
# L1查询
value = self.l1_cache.get(key)
if value is not None:
self.metrics.record_hit('l1')
return value
# L2查询
if self.l2_cache:
value = await self.l2_cache.get(key)
if value is not None:
self.metrics.record_hit('l2')
self.l1_cache.set(key, value) # 回填L1
return value
# L3查询
value = await self.l3_cache.get(key)
if value is not None:
self.metrics.record_hit('l3')
if self.l2_cache:
await self.l2_cache.set(key, value) # 回填L2
self.l1_cache.set(key, value) # 回填L1
return value
self.metrics.record_miss()
return None
async def set(self, key: str, value: Any, ttl: int = None):
"""
写入所有缓存层
"""
self.l1_cache.set(key, value, ttl or 300)
if self.l2_cache:
await self.l2_cache.set(key, value, ttl or 1800)
await self.l3_cache.set(key, value, ttl or 86400)
# 使用装饰器
@multi_level_cache(ttl=300)
async def get_account_resources(account: str):
# 耗时的查询操作
provider = get_provider(account)
return await provider.list_all_resources()
# 缓存预热
async def warmup_cache():
"""
系统启动时预热热点数据
"""
hot_accounts = await get_active_accounts()
for account in hot_accounts:
await get_account_resources(account) # 触发缓存
Redis配置 (可选,降级到L1+L3):
# config/redis.py
REDIS_CONFIG = {
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6379)),
'db': 0,
'password': os.getenv('REDIS_PASSWORD'),
'decode_responses': True,
'max_connections': 50,
'socket_keepalive': True,
'socket_connect_timeout': 5,
'retry_on_timeout': True
}
# Redis降级策略
class ResilientRedisCache:
def __init__(self, config):
try:
self.redis = redis.Redis(**config)
self.redis.ping()
self.available = True
except Exception as e:
logger.warning(f"Redis unavailable: {e}, degrading to L1+L3 cache")
self.available = False
async def get(self, key):
if not self.available:
return None
try:
return await self.redis.get(key)
except Exception as e:
logger.error(f"Redis get failed: {e}")
return None
缓存监控:
class CacheMetrics:
def __init__(self):
self.l1_hits = Counter()
self.l2_hits = Counter()
self.l3_hits = Counter()
self.misses = Counter()
def record_hit(self, level: str):
if level == 'l1':
self.l1_hits.inc()
elif level == 'l2':
self.l2_hits.inc()
elif level == 'l3':
self.l3_hits.inc()
def record_miss(self):
self.misses.inc()
def get_hit_rate(self):
total_requests = (
self.l1_hits._value +
self.l2_hits._value +
self.l3_hits._value +
self.misses._value
)
if total_requests == 0:
return 0
total_hits = self.l1_hits._value + self.l2_hits._value + self.l3_hits._value
return (total_hits / total_requests) * 100
def export_prometheus_metrics(self):
return f"""
# HELP cache_hits_total Total cache hits by level
# TYPE cache_hits_total counter
cache_hits_total {self.l1_hits._value}
cache_hits_total {self.l2_hits._value}
cache_hits_total {self.l3_hits._value}
# HELP cache_miss_total Total cache misses
# TYPE cache_miss_total counter
cache_miss_total {self.misses._value}
# HELP cache_hit_rate Cache hit rate percentage
# TYPE cache_hit_rate gauge
cache_hit_rate {self.get_hit_rate()}
"""
2.3 数据库性能优化 ⭐⭐⭐⭐⭐
优化方向:
CREATE INDEX idx_bill_items_instance_date ON bill_items(instance_id, billing_date);
– 折扣分析索引 CREATE INDEX idx_bill_items_discount_rate ON bill_items(account_id, billing_cycle, discount_rate);
– 覆盖索引 (避免回表) CREATE INDEX idx_bill_items_cost_covering ON bill_items(account_id, billing_cycle, product_code, pretax_amount);
– 函数索引 (MySQL 8.0+) CREATE INDEX idx_bill_items_month ON bill_items((DATE_FORMAT(billing_date, ‘%Y-%m’)));
2. **表分区**:
```sql
-- 账单表按月分区
ALTER TABLE bill_items
PARTITION BY RANGE (YEAR(billing_date) * 100 + MONTH(billing_date)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
-- ... 自动创建未来分区
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 分区管理
CREATE EVENT auto_create_partitions
ON SCHEDULE EVERY 1 MONTH
DO
CALL create_next_month_partition('bill_items');
def get_cost_trends_slow(account, start_date, end_date): # 问题: 全表扫描, 子查询效率低 query = “”” SELECT DATE(billing_date) as date, SUM(pretax_amount) as cost FROM bill_items WHERE account_id = %s AND billing_date BETWEEN %s AND %s AND instance_id IN ( SELECT instance_id FROM resources WHERE status = ‘Running’ ) GROUP BY DATE(billing_date) “”” return db.query(query, (account, start_date, end_date))
def get_cost_trends_optimized(account, start_date, end_date): # 优化: JOIN代替子查询, 使用索引 query = “”” SELECT DATE(b.billing_date) as date, SUM(b.pretax_amount) as cost FROM bill_items b INNER JOIN resources r ON b.instance_id = r.instance_id WHERE b.account_id = %s AND b.billing_date BETWEEN %s AND %s AND r.status = ‘Running’ GROUP BY DATE(b.billing_date) ORDER BY date “”” return db.query(query, (account, start_date, end_date))
4. **连接池调优**:
```python
# database.py
DATABASE_POOL_CONFIG = {
'pool_size': 20, # 连接池大小
'max_overflow': 10, # 最大溢出连接
'pool_timeout': 30, # 获取连接超时
'pool_recycle': 3600, # 连接回收时间 (1小时)
'pool_pre_ping': True, # 连接健康检查
'echo_pool': 'debug', # 连接池日志
}
# 慢查询日志
SLOW_QUERY_THRESHOLD = 1.0 # 1秒
@contextmanager
def query_with_logging(query, params):
start = time.time()
try:
yield db.execute(query, params)
finally:
duration = time.time() - start
if duration > SLOW_QUERY_THRESHOLD:
logger.warning(
f"Slow query detected: {duration:.2f}s",
extra={
'query': query,
'params': params,
'duration': duration
}
)
2.4 Prometheus Metrics集成 ⭐⭐⭐⭐⭐
# metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info
# API指标
api_requests_total = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
api_request_duration_seconds = Histogram(
'api_request_duration_seconds',
'API request duration in seconds',
['method', 'endpoint'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
# 缓存指标
cache_hits_total = Counter(
'cache_hits_total',
'Total cache hits',
['level'] # l1, l2, l3
)
cache_miss_total = Counter(
'cache_miss_total',
'Total cache misses'
)
# 数据库指标
db_query_duration_seconds = Histogram(
'db_query_duration_seconds',
'Database query duration',
['query_type'],
buckets=[0.001, 0.01, 0.1, 0.5, 1.0, 5.0]
)
db_connection_pool_size = Gauge(
'db_connection_pool_size',
'Current database connection pool size'
)
# 业务指标
cost_analysis_total = Counter(
'cost_analysis_total',
'Total cost analyses performed',
['account', 'analysis_type']
)
security_scans_total = Counter(
'security_scans_total',
'Total security scans performed',
['account', 'scan_type']
)
# 中间件
from fastapi import Request
import time
@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
api_requests_total.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
api_request_duration_seconds.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
# Metrics endpoint
from prometheus_client import generate_latest
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
Grafana Dashboard配置:
{
"dashboard": {
"title": "CloudLens Overview",
"panels": [
{
"title": "API Request Rate",
"targets": [
{
"expr": "rate(api_requests_total[5m])",
"legendFormat": " "
}
]
},
{
"title": "API Latency (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(api_request_duration_seconds_bucket[5m]))",
"legendFormat": "P95 latency"
}
]
},
{
"title": "Cache Hit Rate",
"targets": [
{
"expr": "sum(rate(cache_hits_total[5m])) / (sum(rate(cache_hits_total[5m])) + rate(cache_miss_total[5m])) * 100",
"legendFormat": "Hit Rate %"
}
]
}
]
}
}
2.5 分布式追踪 (OpenTelemetry) ⭐⭐⭐⭐
# tracing.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# 初始化Tracer
tracer_provider = TracerProvider()
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(tracer_provider)
# FastAPI自动追踪
FastAPIInstrumentor.instrument_app(app)
# 手动追踪
tracer = trace.get_tracer(__name__)
async def get_cost_analysis(account: str):
with tracer.start_as_current_span("get_cost_analysis") as span:
span.set_attribute("account", account)
# 数据库查询
with tracer.start_as_current_span("db_query"):
bills = await db.query_bills(account)
# 数据处理
with tracer.start_as_current_span("process_data"):
result = process_cost_data(bills)
span.set_attribute("result_size", len(result))
return result
2.6 错误追踪 (Sentry) ⭐⭐⭐⭐
# error_tracking.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
integrations=[
FastApiIntegration(),
SqlalchemyIntegration(),
],
traces_sample_rate=0.1, # 10% traces采样
environment=os.getenv("ENVIRONMENT", "production"),
release=f"cloudlens@{VERSION}",
before_send=filter_sensitive_data, # 过滤敏感信息
)
def filter_sensitive_data(event, hint):
"""
过滤敏感信息
"""
if 'request' in event:
# 移除access_key等敏感参数
if 'data' in event['request']:
event['request']['data'] = mask_sensitive_fields(
event['request']['data']
)
return event
# 自定义异常上报
def handle_cost_analysis_error(account: str, error: Exception):
with sentry_sdk.push_scope() as scope:
scope.set_tag("account", account)
scope.set_context("analysis", {
"account": account,
"timestamp": datetime.now().isoformat()
})
sentry_sdk.capture_exception(error)
2.7 服务拆分方案
如果团队规模 > 5人,可考虑微服务拆分:
服务拆分策略
├── API Gateway (Kong/APISIX)
│ ├── 路由转发
│ ├── 认证鉴权
│ ├── 限流熔断
│ └── API聚合
│
├── Cost Service (成本分析服务)
│ ├── 成本趋势分析
│ ├── 成本预测
│ ├── 折扣分析
│ └── 预算管理
│
├── Resource Service (资源管理服务)
│ ├── 资源查询
│ ├── 资源标签
│ ├── 资源缓存
│ └── 闲置检测
│
├── Security Service (安全合规服务)
│ ├── CIS合规检查
│ ├── 安全扫描
│ ├── 漏洞检测
│ └── 修复建议
│
├── Billing Service (账单服务)
│ ├── 账单获取
│ ├── 账单解析
│ ├── 账单存储
│ └── 账单同步
│
└── Notification Service (通知服务)
├── 告警推送
├── 报告生成
├── Webhook调用
└── 邮件发送
服务间通信:
# 使用消息队列 (RabbitMQ)
class CostAnalysisService:
def __init__(self):
self.mq = RabbitMQClient()
async def analyze_cost(self, account: str):
# 1. 发布成本分析事件
await self.mq.publish(
exchange='cloudlens.events',
routing_key='cost.analysis.started',
message={
'account': account,
'timestamp': datetime.now().isoformat()
}
)
# 2. 执行分析
result = await self._perform_analysis(account)
# 3. 发布结果事件
await self.mq.publish(
exchange='cloudlens.events',
routing_key='cost.analysis.completed',
message={
'account': account,
'result': result
}
)
return result
# 订阅事件
class NotificationService:
def __init__(self):
self.mq = RabbitMQClient()
self.mq.subscribe(
exchange='cloudlens.events',
routing_key='cost.analysis.completed',
callback=self.on_cost_analysis_completed
)
async def on_cost_analysis_completed(self, message):
# 发送通知
await self.send_notification(message)
2.8 Kubernetes部署
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: cloudlens-api
spec:
replicas: 3
selector:
matchLabels:
app: cloudlens-api
template:
metadata:
labels:
app: cloudlens-api
spec:
containers:
- name: api
image: cloudlens/api:latest
ports:
- containerPort: 8000
env:
- name: DB_HOST
valueFrom:
secretKeyRef:
name: cloudlens-secrets
key: db-host
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: cloudlens-api
spec:
selector:
app: cloudlens-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: cloudlens-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: cloudlens-api
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
当前状态: 使用Zustand,但状态管理不够规范
优化方案:
// lib/stores/index.ts - 状态拆分
export { useAccountStore } from './accountStore'
export { useCostStore } from './costStore'
export { useResourceStore } from './resourceStore'
export { useUIStore } from './uiStore'
// lib/stores/costStore.ts
import { create } from 'zustand'
import { persist, devtools } from 'zustand/middleware'
import { immer } from 'zustand/middleware/immer'
interface CostState {
// 状态
costTrends: CostTrend[]
loading: boolean
error: string | null
// 计算属性
totalCost: number
averageCost: number
// 动作
fetchCostTrends: (account: string, range: DateRange) => Promise<void>
updateCostTrend: (id: string, data: Partial<CostTrend>) => void
clearError: () => void
}
export const useCostStore = create<CostState>()(
devtools(
persist(
immer((set, get) => ({
// 初始状态
costTrends: [],
loading: false,
error: null,
// 计算属性
get totalCost() {
return get().costTrends.reduce((sum, t) => sum + t.cost, 0)
},
get averageCost() {
const trends = get().costTrends
return trends.length > 0 ? get().totalCost / trends.length : 0
},
// 异步动作
fetchCostTrends: async (account, range) => {
set({ loading: true, error: null })
try {
const data = await api.getCostTrends(account, range)
set({ costTrends: data, loading: false })
} catch (error) {
set({ error: error.message, loading: false })
}
},
// 同步动作 (使用immer)
updateCostTrend: (id, data) => {
set((state) => {
const trend = state.costTrends.find(t => t.id === id)
if (trend) {
Object.assign(trend, data)
}
})
},
clearError: () => set({ error: null })
})),
{ name: 'cost-store' }
)
)
)
目标: 替代自定义API请求逻辑,统一数据管理
// lib/api/queries.ts
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'
// Query Hooks
export function useCostTrends(account: string, range: DateRange) {
return useQuery({
queryKey: ['costTrends', account, range],
queryFn: () => api.getCostTrends(account, range),
staleTime: 5 * 60 * 1000, // 5分钟内数据不过期
cacheTime: 30 * 60 * 1000, // 缓存30分钟
retry: 3,
retryDelay: (attemptIndex) => Math.min(1000 * 2 ** attemptIndex, 30000),
onError: (error) => {
toast.error(`获取成本趋势失败: ${error.message}`)
}
})
}
export function useResources(account: string, filters?: ResourceFilters) {
return useQuery({
queryKey: ['resources', account, filters],
queryFn: () => api.getResources(account, filters),
enabled: !!account, // 只有account存在时才执行
placeholderData: (previousData) => previousData, // 保留旧数据
})
}
// Mutation Hooks
export function useUpdateResourceTags() {
const queryClient = useQueryClient()
return useMutation({
mutationFn: (data: { resourceId: string; tags: Record<string, string> }) =>
api.updateResourceTags(data.resourceId, data.tags),
onMutate: async (data) => {
// 乐观更新
await queryClient.cancelQueries(['resources'])
const previousResources = queryClient.getQueryData(['resources'])
queryClient.setQueryData(['resources'], (old: any) => {
return old.map((r: Resource) =>
r.id === data.resourceId ? { ...r, tags: data.tags } : r
)
})
return { previousResources }
},
onError: (err, data, context) => {
// 回滚
queryClient.setQueryData(['resources'], context?.previousResources)
toast.error('更新标签失败')
},
onSuccess: () => {
// 刷新数据
queryClient.invalidateQueries(['resources'])
toast.success('标签更新成功')
}
})
}
// 使用示例
function CostDashboard() {
const { data: costTrends, isLoading, error, refetch } = useCostTrends(
currentAccount,
{ start: '2026-01-01', end: '2026-01-31' }
)
if (isLoading) return <Spinner />
if (error) return <ErrorMessage error={error} onRetry={refetch} />
return <CostChart data={costTrends} />
}
3.3.1 虚拟滚动 (已实现):
import { FixedSizeList } from 'react-window'
function ResourceTable({ resources }) {
const Row = ({ index, style }) => (
<div style={style}>
<ResourceRow resource={resources[index]} />
</div>
)
return (
<FixedSizeList
height={600}
itemCount={resources.length}
itemSize={60}
width="100%"
>
{Row}
</FixedSizeList>
)
}
3.3.2 代码分割:
// 路由懒加载
import { lazy, Suspense } from 'react'
const Dashboard = lazy(() => import('./pages/Dashboard'))
const CostAnalysis = lazy(() => import('./pages/CostAnalysis'))
const SecurityAudit = lazy(() => import('./pages/SecurityAudit'))
function App() {
return (
<Suspense fallback={<PageLoader />}>
<Routes>
<Route path="/" element={<Dashboard />} />
<Route path="/cost" element={<CostAnalysis />} />
<Route path="/security" element={<SecurityAudit />} />
</Routes>
</Suspense>
)
}
// 组件懒加载
const HeavyChart = lazy(() => import('./components/HeavyChart'))
function Page() {
const [showChart, setShowChart] = useState(false)
return (
<div>
<button onClick={() => setShowChart(true)}>显示图表</button>
{showChart && (
<Suspense fallback={<ChartSkeleton />}>
<HeavyChart />
</Suspense>
)}
</div>
)
}
3.3.3 图表优化:
// 图表数据采样
function sampleData(data: DataPoint[], maxPoints: number = 100): DataPoint[] {
if (data.length <= maxPoints) return data
const step = Math.ceil(data.length / maxPoints)
return data.filter((_, index) => index % step === 0)
}
// 图表懒加载
function CostChart({ data }) {
const [isVisible, setIsVisible] = useState(false)
const ref = useRef(null)
useEffect(() => {
const observer = new IntersectionObserver(([entry]) => {
if (entry.isIntersecting) {
setIsVisible(true)
observer.disconnect()
}
})
if (ref.current) observer.observe(ref.current)
return () => observer.disconnect()
}, [])
return (
<div ref={ref}>
{isVisible ? (
<RechartsLineChart data={sampleData(data, 100)} />
) : (
<ChartPlaceholder />
)}
</div>
)
}
┌───────────┐
│ E2E测试 │ (10%) - Playwright
├───────────┤
│ 集成测试 │ (30%) - pytest + httpx
├───────────┤
│ 单元测试 │ (60%) - pytest + unittest.mock
└───────────┘
4.2 测试覆盖率目标:
| 模块 | 当前覆盖率 | Q2目标 | Q4目标 |
|---|---|---|---|
| core/ | ~80% | 90% | 95% |
| web/backend/ | ~40% | 75% | 85% |
| resource_modules/ | ~60% | 80% | 90% |
| CLI | ~50% | 70% | 80% |
| 前端 | ~20% | 60% | 75% |
| 全局 | ~55% | 75% | 85% |
4.3 单元测试示例:
# tests/core/test_cost_analyzer.py
import pytest
from datetime import datetime, timedelta
from core.cost_analyzer import CostAnalyzer
@pytest.fixture
def cost_analyzer():
return CostAnalyzer(account='test-account')
@pytest.fixture
def sample_bills():
return [
{'billing_date': '2026-01-01', 'pretax_amount': 100.0, 'instance_id': 'i-001'},
{'billing_date': '2026-01-02', 'pretax_amount': 150.0, 'instance_id': 'i-001'},
{'billing_date': '2026-01-03', 'pretax_amount': 120.0, 'instance_id': 'i-002'},
]
def test_calculate_total_cost(cost_analyzer, sample_bills):
total = cost_analyzer.calculate_total_cost(sample_bills)
assert total == 370.0
def test_detect_cost_spike(cost_analyzer, sample_bills):
# 添加异常数据
anomaly_bill = {'billing_date': '2026-01-04', 'pretax_amount': 500.0, 'instance_id': 'i-001'}
bills = sample_bills + [anomaly_bill]
spikes = cost_analyzer.detect_cost_spike(bills, threshold=2.0)
assert len(spikes) == 1
assert spikes[0]['billing_date'] == '2026-01-04'
@pytest.mark.asyncio
async def test_fetch_cost_trends_with_cache(cost_analyzer, mocker):
# Mock缓存
mock_cache = mocker.patch('core.cache.get')
mock_cache.return_value = {'cached': True}
result = await cost_analyzer.fetch_cost_trends('2026-01', use_cache=True)
assert result['cached'] is True
mock_cache.assert_called_once()
4.4 集成测试示例:
# tests/integration/test_api_costs.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_get_cost_trends_api(client: AsyncClient, test_account):
response = await client.get(
"/api/v1/costs/trends",
params={
"account": test_account,
"start_date": "2026-01-01",
"end_date": "2026-01-31"
}
)
assert response.status_code == 200
data = response.json()
assert "trends" in data
assert len(data["trends"]) > 0
assert "total_cost" in data
@pytest.mark.asyncio
async def test_cost_prediction_api(client: AsyncClient, test_account):
response = await client.post(
"/api/v1/costs/predict",
json={
"account": test_account,
"days": 30
}
)
assert response.status_code == 200
data = response.json()
assert "forecast" in data
assert len(data["forecast"]) == 30
assert "confidence_score" in data
assert 0 <= data["confidence_score"] <= 1
4.5 E2E测试增强:
// tests/e2e/cost-analysis.spec.ts
import { test, expect } from '@playwright/test'
test.describe('成本分析功能', () => {
test('查看成本趋势图', async ({ page }) => {
await page.goto('http://localhost:3000/cost')
// 选择账号
await page.selectOption('#account-select', 'test-account')
// 等待图表加载
await page.waitForSelector('.cost-chart', { state: 'visible' })
// 验证图表存在
const chart = await page.locator('.cost-chart')
await expect(chart).toBeVisible()
// 截图对比
await expect(page).toHaveScreenshot('cost-trends-chart.png')
})
test('切换对比模式', async ({ page }) => {
await page.goto('http://localhost:3000/cost')
// 点击同比按钮
await page.click('button:has-text("同比")')
// 验证同比数据加载
await expect(page.locator('.comparison-badge')).toContainText('%')
// 验证API调用
await page.waitForResponse(
(response) => response.url().includes('/api/v1/costs/trends') &&
response.url().includes('comparison=yoy')
)
})
test('导出成本报告', async ({ page }) => {
await page.goto('http://localhost:3000/cost')
// 点击导出按钮
const [download] = await Promise.all([
page.waitForEvent('download'),
page.click('button:has-text("导出")')
])
// 验证文件下载
expect(download.suggestedFilename()).toMatch(/cost-report-.*\.xlsx/)
})
})
5.1 GitHub Actions CI:
# .github/workflows/ci.yml
name: CI Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
jobs:
backend-test:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: test_password
MYSQL_DATABASE: cloudlens_test
ports:
- 3306:3306
options: >-
--health-cmd="mysqladmin ping"
--health-interval=10s
--health-timeout=5s
--health-retries=3
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run linters
run: |
black --check .
ruff check .
mypy core/ web/backend/
- name: Run tests
env:
DB_TYPE: mysql
MYSQL_HOST: 127.0.0.1
MYSQL_PORT: 3306
MYSQL_USER: root
MYSQL_PASSWORD: test_password
MYSQL_DATABASE: cloudlens_test
run: |
pytest tests/ -v --cov --cov-report=xml --cov-report=term
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
flags: backend
- name: Security scan
run: |
bandit -r core/ web/backend/ -ll
frontend-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '18'
cache: 'npm'
cache-dependency-path: web/frontend/package-lock.json
- name: Install dependencies
working-directory: web/frontend
run: npm ci
- name: Run linters
working-directory: web/frontend
run: |
npm run lint
npm run type-check
- name: Run tests
working-directory: web/frontend
run: npm test -- --coverage
- name: Build
working-directory: web/frontend
run: npm run build
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./web/frontend/coverage/coverage-final.json
flags: frontend
e2e-test:
runs-on: ubuntu-latest
needs: [backend-test, frontend-test]
steps:
- uses: actions/checkout@v3
- name: Setup Python & Node.js
uses: actions/setup-python@v4
with:
python-version: '3.10'
- uses: actions/setup-node@v3
with:
node-version: '18'
- name: Install dependencies
run: |
pip install -r requirements.txt
cd web/frontend && npm ci
npx playwright install --with-deps
- name: Start services
run: |
# 启动后端
python web/backend/main.py &
# 启动前端
cd web/frontend && npm run dev &
# 等待服务启动
npx wait-on http://localhost:8000/health http://localhost:3000
- name: Run E2E tests
run: |
cd web/frontend
npx playwright test
- name: Upload test results
if: always()
uses: actions/upload-artifact@v3
with:
name: playwright-report
path: web/frontend/playwright-report/
retention-days: 7
5.2 Pre-commit Hooks:
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-json
- id: check-added-large-files
args: ['--maxkb=1000']
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
language_version: python3.10
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.270
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
additional_dependencies: [types-all]
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
args: ['-ll', '-r', 'core/', 'web/backend/']
| 周次 | 里程碑 | 关键任务 | 预期产出 |
|---|---|---|---|
| W1-2 | API模块化重构 | - 创建模块化目录结构 - 迁移核心端点 - Service层重构 |
- 新API结构 - 10+个模块文件 - 测试覆盖70% |
| W3-4 | 多级缓存实现 | - Redis集成(可选) - L1/L2/L3缓存层 - 缓存监控 |
- 缓存命中率>80% - API响应时间↓50% |
| W5-6 | 数据库优化 | - 索引优化 - 查询优化 - 表分区 |
- 慢查询↓70% - 数据库负载↓40% |
| W7-8 | 实时数据刷新 | - WebSocket实现 - 前端实时更新 - 增量推送 |
- 实时刷新功能 - 用户体验↑40% |
| W9-10 | 成本趋势增强 | - 同比/环比功能 - 多账号对比 - 数据钻取 |
- 对比分析功能 - 智能洞察面板 |
| W11-12 | 移动端优化 | - 响应式布局 - 触摸优化 - 性能优化 |
- 移动端适配完成 - 移动端使用率↑60% |
Q1里程碑:
| 周次 | 里程碑 | 关键任务 | 预期产出 |
|---|---|---|---|
| W13-14 | 监控系统 | - Prometheus集成 - Grafana Dashboard - 告警规则 |
- 监控大盘上线 - 关键指标可视化 |
| W15-16 | 错误追踪 | - Sentry集成 - 分布式追踪 - 错误聚合 |
- 错误追踪系统 - 问题定位时间↓70% |
| W17-18 | 多维度分析 | - 成本分配引擎 - 多维度聚合 - 归因分析 |
- 成本分配功能 - 多维度报告 |
| W19-20 | 异常检测 | - 多算法检测 - 智能告警 - 告警路由 |
- 异常检测系统 - 告警准确率>90% |
| W21-22 | AI预测增强 | - 多模型对比 - 场景预测 - 置信区间 |
- 增强预测功能 - 预测准确率>85% |
| W23-24 | 测试覆盖提升 | - 单元测试补充 - 集成测试 - E2E测试 |
- 测试覆盖率达75% - CI/CD流水线完善 |
Q2里程碑:
| 周次 | 里程碑 | 关键任务 | 预期产出 |
|---|---|---|---|
| W25-27 | 多用户系统 | - 用户认证 - MFA支持 - RBAC权限 |
- 多用户登录 - 权限管理系统 |
| W28-30 | 审计日志 | - 操作审计 - 日志查询 - 合规报告 |
- 审计日志系统 - 合规报告生成 |
| W31-33 | 通知集成 | - Slack/钉钉 - 企业微信 - 邮件推送 |
- 多渠道通知 - Webhook支持 |
| W34-36 | API开放 | - OpenAPI文档 - API密钥 - 限流配额 |
- 完整API文档 - API管理平台 |
| W37-39 | 工作流引擎 | - 工作流编排 - 审批流程 - 定时任务 |
- 自动化工作流 - 审批系统 |
Q3里程碑:
| 周次 | 里程碑 | 关键任务 | 预期产出 |
|---|---|---|---|
| W40-42 | NL2SQL引擎 | - 自然语言查询 - 意图识别 - SQL生成 |
- NL查询功能 - 查询准确率>85% |
| W43-45 | 智能问答 | - RAG系统 - 向量数据库 - 上下文对话 |
- AI助手上线 - 问答准确率>90% |
| W46-48 | 微服务拆分(可选) | - 服务拆分 - API Gateway - 消息队列 |
- 微服务架构 - 服务独立部署 |
| W49-52 | 云原生化(可选) | - Kubernetes部署 - Service Mesh - 自动扩缩容 |
- K8s部署方案 - 云原生架构 |
Q4里程碑:
| 角色 | Q1 | Q2 | Q3 | Q4 | 职责 |
|---|---|---|---|---|---|
| 后端工程师 | 2人 | 2人 | 2人 | 3人 | API开发、性能优化、微服务 |
| 前端工程师 | 1人 | 1人 | 2人 | 2人 | UI开发、性能优化、移动端 |
| 测试工程师 | 0.5人 | 1人 | 1人 | 1人 | 测试用例、自动化测试 |
| DevOps工程师 | 0.5人 | 1人 | 1人 | 1人 | CI/CD、监控、容器化 |
| AI工程师 | - | 0.5人 | 0.5人 | 1人 | 异常检测、NL2SQL、问答系统 |
| 产品经理 | 0.5人 | 0.5人 | 1人 | 1人 | 需求管理、用户调研 |
| 合计 | 4.5人 | 6人 | 7.5人 | 9人 | - |
| 成本项 | Q1-Q2 | Q3-Q4 | 年度总计 | 备注 |
|---|---|---|---|---|
| 云服务器 | ¥6,000 | ¥12,000 | ¥18,000 | 按需扩容 |
| 数据库 | ¥3,000 | ¥6,000 | ¥9,000 | MySQL/Redis |
| 监控告警 | ¥2,000 | ¥4,000 | ¥6,000 | Prometheus/Grafana |
| 第三方服务 | ¥3,000 | ¥6,000 | ¥9,000 | Sentry/OpenAI API |
| CDN/存储 | ¥1,000 | ¥2,000 | ¥3,000 | OSS/CDN |
| 开发工具 | ¥5,000 | ¥5,000 | ¥10,000 | IDE/CI/CD |
| 合计 | ¥20,000 | ¥35,000 | ¥55,000 | - |
| 风险类型 | 风险描述 | 影响 | 概率 | 应对策略 |
|---|---|---|---|---|
| 技术风险 | API重构引入Bug | 高 | 中 | 灰度发布、A/B测试、快速回滚 |
| 性能风险 | 多级缓存复杂度高 | 中 | 中 | 渐进式实施、降级方案 |
| 人力风险 | 关键人员离职 | 高 | 低 | 文档完善、代码审查、知识分享 |
| 安全风险 | 多用户权限漏洞 | 高 | 低 | 安全测试、渗透测试、审计日志 |
| 兼容风险 | 新功能影响老功能 | 中 | 中 | 回归测试、版本控制、兼容层 |
| 依赖风险 | 第三方服务不稳定 | 中 | 中 | 降级方案、多供应商、自建服务 |
| 成本风险 | 云服务超预算 | 低 | 中 | 成本监控、按需扩容、资源优化 |
| 指标 | 当前 | Q2目标 | Q4目标 | 测量方式 |
|---|---|---|---|---|
| API响应时间(P95) | 500ms | <200ms | <150ms | Prometheus |
| API错误率 | 未知 | <0.5% | <0.1% | Sentry |
| 缓存命中率 | 未知 | >80% | >85% | Prometheus |
| 测试覆盖率 | 55% | 75% | 85% | Coverage.py |
| 代码重复率 | 未知 | <5% | <3% | SonarQube |
| 部署频率 | 手动 | 每周 | 每天 | CI/CD |
| 系统可用性 | 未知 | >99.5% | >99.9% | 监控系统 |
| 数据库慢查询 | 未知 | <10个/天 | <5个/天 | MySQL慢日志 |
| 指标 | 当前 | Q2目标 | Q4目标 | 测量方式 |
|---|---|---|---|---|
| 周活跃用户(WAU) | - | 100 | 500 | Google Analytics |
| 月活跃用户(MAU) | - | 300 | 1500 | Google Analytics |
| 日均报告生成 | - | 30 | 150 | 后端埋点 |
| 用户留存率(D7) | - | >60% | >70% | Mixpanel |
| 功能使用率 | - | >50% | >70% | 后端埋点 |
| 平均会话时长 | - | >10分钟 | >15分钟 | Google Analytics |
| 用户满意度(NPS) | - | >40 | >60 | 用户调研 |
| 指标 | 当前 | Q2目标 | Q4目标 | 测量方式 |
|---|---|---|---|---|
| 月均成本节省 | - | ¥50,000 | ¥200,000 | 成本分析引擎 |
| 安全风险发现 | - | 100个/月 | 500个/月 | 安全扫描系统 |
| 合规检查通过率 | - | >80% | >90% | CIS Benchmark |
| 闲置资源识别 | - | 50个/月 | 200个/月 | 闲置检测引擎 |
| 自动化修复率 | - | >30% | >50% | 修复引擎统计 |
| 告警准确率 | - | >85% | >92% | 用户反馈 |
| 指标 | 当前 | Q2目标 | Q4目标 | 测量方式 |
|---|---|---|---|---|
| 代码审查响应时间 | - | <4h | <2h | GitHub |
| PR合并时间 | - | <1天 | <0.5天 | GitHub |
| Bug修复时间 | - | <24h | <12h | Jira |
| 文档完整度 | 60% | 80% | 95% | 人工评估 |
| 技术债务 | 高 | 中 | 低 | SonarQube |
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| MySQL缓存表 | 简单、无额外依赖 | 性能一般、不支持过期 | 小规模、低频访问 |
| Redis | 高性能、功能丰富 | 需额外部署、成本增加 | 高并发、实时性要求高 |
| Memcached | 性能好、简单 | 功能少、无持久化 | 纯缓存场景 |
| 进程内缓存 | 零延迟、无网络开销 | 不共享、容量有限 | 热点数据 |
推荐: 多级缓存 (L1进程+L2Redis+L3MySQL)
| 方案 | 优点 | 缺点 | 成本 |
|---|---|---|---|
| Prometheus+Grafana | 开源、功能强大 | 需自建、运维成本 | 低 |
| Datadog | 功能全、托管 | 成本高 | 高 |
| New Relic | APM强大 | 成本高 | 高 |
| 阿里云ARMS | 集成简单 | 功能有限 | 中 |
推荐: Prometheus+Grafana (开源自建)
| 方案 | 吞吐量 | 延迟 | 可靠性 | 复杂度 |
|---|---|---|---|---|
| RabbitMQ | 中 | 低 | 高 | 低 |
| Kafka | 极高 | 中 | 高 | 高 |
| Redis Pub/Sub | 高 | 极低 | 中 | 极低 |
| NATS | 高 | 极低 | 中 | 低 |
推荐: RabbitMQ (微服务场景) 或 Redis Pub/Sub (简单场景)
文档维护者: CloudLens Team 最后更新: 2026-01-08 下次评审: 2026-04-01 (Q1结束)
愿景: 打造世界一流的多云资源治理与成本优化平台 🚀