服务器发送事件 (SSE) 提供了一种标准方式,让 Web 服务器通过单个、长期的 HTTP 连接向客户端发送更新。在 MCP 的上下文中,SSE 用于让远程服务器实时地将数据(如工具响应)流式传输到您的 CrewAI 应用程序。
关键概念
- 远程服务器:SSE 适用于远程托管的 MCP 服务器。
- 单向流:通常,SSE 是从服务器到客户端的单向通信通道。
MCPServerAdapter 配置:对于 SSE,您需要提供服务器的 URL 并指定传输类型。
通过 SSE 连接
您可以使用两种主要方法来连接基于 SSE 的 MCP 服务器并管理连接生命周期。
1. 完全托管连接(推荐)
推荐使用 Python 上下文管理器(with 语句)。它会自动处理与 SSE MCP 服务器的连接建立和关闭。
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
server_params = {
"url": "https://:8000/sse", # Replace with your actual SSE server URL
"transport": "sse"
}
# Using MCPServerAdapter with a context manager
try:
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from SSE MCP server: {[tool.name for tool in tools]}")
# Example: Using a tool from the SSE MCP server
sse_agent = Agent(
role="Remote Service User",
goal="Utilize a tool provided by a remote SSE MCP server.",
backstory="An AI agent that connects to external services via SSE.",
tools=tools,
reasoning=True,
verbose=True,
)
sse_task = Task(
description="Fetch real-time stock updates for 'AAPL' using an SSE tool.",
expected_output="The latest stock price for AAPL.",
agent=sse_agent,
markdown=True
)
sse_crew = Crew(
agents=[sse_agent],
tasks=[sse_task],
verbose=True,
process=Process.sequential
)
if tools: # Only kickoff if tools were loaded
result = sse_crew.kickoff() # Add inputs={'stock_symbol': 'AAPL'} if tool requires it
print("\nCrew Task Result (SSE - Managed):\n", result)
else:
print("Skipping crew kickoff as tools were not loaded (check server connection).")
except Exception as e:
print(f"Error connecting to or using SSE MCP server (Managed): {e}")
print("Ensure the SSE MCP server is running and accessible at the specified URL.")
请将 "https://:8000/sse" 替换为您的 SSE MCP 服务器的实际 URL。
2. 手动连接生命周期
如果您需要更精细的控制,可以手动管理 MCPServerAdapter 的连接生命周期。
您必须调用 mcp_server_adapter.stop() 以确保连接被关闭并释放资源。强烈建议使用 try...finally 块。
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
server_params = {
"url": "https://:8000/sse", # Replace with your actual SSE server URL
"transport": "sse"
}
mcp_server_adapter = None
try:
mcp_server_adapter = MCPServerAdapter(server_params)
mcp_server_adapter.start()
tools = mcp_server_adapter.tools
print(f"Available tools (manual SSE): {[tool.name for tool in tools]}")
manual_sse_agent = Agent(
role="Remote Data Analyst",
goal="Analyze data fetched from a remote SSE MCP server using manual connection management.",
backstory="An AI skilled in handling SSE connections explicitly.",
tools=tools,
verbose=True
)
analysis_task = Task(
description="Fetch and analyze the latest user activity trends from the SSE server.",
expected_output="A summary report of user activity trends.",
agent=manual_sse_agent
)
analysis_crew = Crew(
agents=[manual_sse_agent],
tasks=[analysis_task],
verbose=True,
process=Process.sequential
)
result = analysis_crew.kickoff()
print("\nCrew Task Result (SSE - Manual):\n", result)
except Exception as e:
print(f"An error occurred during manual SSE MCP integration: {e}")
print("Ensure the SSE MCP server is running and accessible.")
finally:
if mcp_server_adapter and mcp_server_adapter.is_connected:
print("Stopping SSE MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
elif mcp_server_adapter:
print("SSE MCP server adapter was not connected. No stop needed or start failed.")
SSE 的安全注意事项
DNS 重绑定攻击:如果 MCP 服务器未得到适当保护,SSE 传输可能容易受到 DNS 重绑定攻击。这可能允许恶意网站与本地或内网的 MCP 服务器进行交互。
为了降低此风险
- MCP 服务器实现应验证传入 SSE 连接的
Origin 头部。
- 在运行用于开发的本地 SSE MCP 服务器时,仅绑定到
localhost (127.0.0.1),而不是所有网络接口 (0.0.0.0)。
- 如果 SSE 连接暴露了敏感工具或数据,则为所有连接实现适当的身份验证。
有关安全最佳实践的全面概述,请参阅我们的安全注意事项页面和官方 MCP 传输安全文档。