-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
243 lines (209 loc) · 7.75 KB
/
server.py
File metadata and controls
243 lines (209 loc) · 7.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
from mcp.server.fastmcp import FastMCP
import ast
import time
import io
import csv
import statistics
import traceback
import tracemalloc
from datetime import datetime
mcp = FastMCP("CodeBoostMCP", host="0.0.0.0", port=8000)
@mcp.tool()
def measure_complexity(code: str) -> dict:
"""
对一段 Python 代码进行静态复杂度分析。
通过 AST 统计循环嵌套深度、函数数量、分支数量,
并给出普通开发者能理解的复杂度判断。
code: 需要分析的 Python 源码字符串
"""
if not code.strip():
return {
"success": False,
"message": "代码内容不能为空。"
}
try:
tree = ast.parse(code)
except SyntaxError as e:
return {
"success": False,
"message": f"代码存在语法错误,无法分析:{str(e)}"
}
loop_nodes = 0
branch_nodes = 0
func_nodes = 0
call_nodes = 0
comprehension_nodes = 0
max_loop_depth = 0
def walk_depth(node, depth=0):
nonlocal max_loop_depth
is_loop = isinstance(node, (ast.For, ast.While))
cur = depth + 1 if is_loop else depth
if is_loop:
max_loop_depth = max(max_loop_depth, cur)
for child in ast.iter_child_nodes(node):
walk_depth(child, cur)
walk_depth(tree)
for node in ast.walk(tree):
if isinstance(node, (ast.For, ast.While)):
loop_nodes += 1
elif isinstance(node, (ast.If, ast.IfExp)):
branch_nodes += 1
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
func_nodes += 1
elif isinstance(node, ast.Call):
call_nodes += 1
elif isinstance(node, (ast.ListComp, ast.SetComp, ast.DictComp, ast.GeneratorExp)):
comprehension_nodes += 1
# 简单的圈复杂度估算:分支 + 循环 + 1
cyclomatic = branch_nodes + loop_nodes + 1
if max_loop_depth >= 3:
complexity_hint = "存在三层及以上嵌套循环,很可能是 O(n^3) 或更高,性能风险很大。"
elif max_loop_depth == 2:
complexity_hint = "存在双层嵌套循环,可能是 O(n^2),数据量大时需要重点优化。"
elif max_loop_depth == 1:
complexity_hint = "存在单层循环,通常是 O(n),一般可接受。"
else:
complexity_hint = "未检测到显式循环,主要关注函数调用和内置操作的开销。"
return {
"success": True,
"loop_count": loop_nodes,
"max_loop_depth": max_loop_depth,
"branch_count": branch_nodes,
"function_count": func_nodes,
"call_count": call_nodes,
"comprehension_count": comprehension_nodes,
"estimated_cyclomatic_complexity": cyclomatic,
"complexity_hint": complexity_hint
}
@mcp.tool()
def run_benchmark(code: str, entry_call: str, repeat: int = 5) -> dict:
"""
在受限环境中执行一段 Python 代码,并对入口调用进行基准测试。
返回平均耗时、最快耗时、内存峰值,用于优化前后对比。
code: 完整可运行的 Python 源码(包含函数定义)
entry_call: 入口调用表达式,例如 solve(test_data)
repeat: 重复执行次数,默认 5 次取统计值
"""
if not code.strip() or not entry_call.strip():
return {
"success": False,
"message": "代码和入口调用表达式都不能为空。"
}
if repeat < 1:
repeat = 1
if repeat > 50:
repeat = 50
# 受限命名空间,避免明显危险操作
forbidden = ["os.system", "subprocess", "open(", "__import__", "eval(", "exec("]
lower_code = code.lower()
for kw in forbidden:
if kw in lower_code:
return {
"success": False,
"message": f"检测到可能不安全的操作:{kw},已拒绝执行。"
}
sandbox = {}
try:
compiled = compile(code, "<codeboost>", "exec")
exec(compiled, sandbox)
except Exception:
return {
"success": False,
"message": f"代码加载失败:\n{traceback.format_exc()}"
}
timings = []
result_value = None
peak_memory_kb = 0
try:
# 预热一次
eval(entry_call, sandbox)
for _ in range(repeat):
tracemalloc.start()
start = time.perf_counter()
result_value = eval(entry_call, sandbox)
elapsed = time.perf_counter() - start
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
timings.append(elapsed)
peak_memory_kb = max(peak_memory_kb, peak / 1024)
except Exception:
return {
"success": False,
"message": f"基准执行失败:\n{traceback.format_exc()}"
}
avg_time = statistics.mean(timings)
best_time = min(timings)
return {
"success": True,
"entry_call": entry_call,
"repeat": repeat,
"avg_time_ms": round(avg_time * 1000, 4),
"best_time_ms": round(best_time * 1000, 4),
"peak_memory_kb": round(peak_memory_kb, 2),
"result_preview": str(result_value)[:200]
}
@mcp.tool()
def generate_optimization_report(report_text: str, filename: str = "codeboost_report.csv") -> dict:
"""
根据文本版优化记录生成 CSV 优化报告,并保存到当前目录。
report_text: 优化记录文本,每行格式为:
项目:指标名称, 优化前值, 优化后值, 说明
例如:
耗时:平均耗时(ms), 1200, 35, 用哈希表替代双层循环
内存:峰值内存(KB), 5000, 800, 改用生成器
filename: 输出文件名,默认 codeboost_report.csv
"""
if not report_text.strip():
return {
"success": False,
"message": "优化记录内容不能为空。"
}
rows = []
lines = [line.strip() for line in report_text.splitlines() if line.strip()]
for line in lines:
clean = line.replace(":", ":").replace(",", ",")
if ":" in clean:
category, rest = clean.split(":", 1)
else:
category, rest = "其他", clean
parts = [p.strip() for p in rest.split(",")]
metric = parts[0] if len(parts) > 0 else ""
before = parts[1] if len(parts) > 1 else ""
after = parts[2] if len(parts) > 2 else ""
note = parts[3] if len(parts) > 3 else "由 CodeBoost 优化工具生成"
# 计算提升比例
improvement = ""
try:
b = float(before)
a = float(after)
if b > 0:
improvement = f"{round((b - a) / b * 100, 1)}%"
except ValueError:
improvement = ""
rows.append({
"category": category.strip(),
"metric": metric,
"before": before,
"after": after,
"improvement": improvement,
"note": note
})
output = io.StringIO()
writer = csv.DictWriter(
output,
fieldnames=["category", "metric", "before", "after", "improvement", "note"]
)
writer.writeheader()
writer.writerows(rows)
csv_content = output.getvalue()
with open(filename, "w", encoding="utf-8-sig", newline="") as f:
f.write(csv_content)
return {
"success": True,
"filename": filename,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"item_count": len(rows),
"csv_content": csv_content
}
if __name__ == "__main__":
mcp.run(transport="sse")