-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMCP_Server_Demo.py
More file actions
84 lines (69 loc) · 2.47 KB
/
Copy pathMCP_Server_Demo.py
File metadata and controls
84 lines (69 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import aiohttp
from fastapi import FastAPI, HTTPException
from mcp.server.fastmcp import FastMCP
from typing import Dict, Any
# 初始化MCP服务器
mcp = FastMCP("API2MCP-mcp-server")
@mcp.tool()
def clac_bmi(height, weight):
"""
根据身高体重计算BMI值
"""
return weight / (height / 100) ** 2
@mcp.tool()
async def get_ip() -> Dict[str, Any]:
"""
获取当前所在位置的IP地址及城市、经纬度等信息
"""
# 定义 API URL 为常量,便于维护
API_URL = "https://realip.cc"
try:
# 使用 aiohttp 进行异步请求
async with aiohttp.ClientSession() as session:
async with session.get(API_URL, timeout=5) as response:
# 检查 HTTP 响应状态码
if response.status != 200:
raise ValueError(f"Unexpected HTTP status code: {response.status}")
# 解析并返回 JSON 数据
data = await response.json()
return data
except aiohttp.ClientError as e:
# 捕获网络相关异常
print(f"Network error occurred: {e}")
return {"error": "Network error"}
except ValueError as e:
# 捕获非 200 状态码或其他解析错误
print(f"Value error occurred: {e}")
return {"error": str(e)}
except Exception as e:
# 捕获其他未知异常
print(f"An unexpected error occurred: {e}")
return {"error": "Unexpected error"}
@mcp.tool()
def read_file(file_path: str) -> str:
"""
根据给定文件路径读取文件内容
"""
try:
# 明确指定编码为 UTF-8,避免潜在的解码问题
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
except Exception as e:
# 捕获其他未知异常
return f"An unexpected error occurred while reading file at path '{file_path}': {e}"
@mcp.tool()
def write_file(file_path: str, content: str) -> str:
"""
将内容写入指定文件路径
"""
try:
# 明确指定编码为 UTF-8,避免潜在的解码问题
with open(file_path, "w", encoding="utf-8") as file:
file.write(content)
return f"Successfully wrote content to file at path '{file_path}'."
except Exception as e:
# 捕获其他未知异常
return f"An unexpected error occurred while writing file at path '{file_path}': {e}"
app = FastAPI()
# 挂载SSE服务器
app.mount("/", mcp.sse_app())