简介
CrewAI 工作流支持流式输出,允许您在工作流执行时接收实时更新。此功能使您能够构建响应式应用程序,以增量方式显示结果、提供实时进度更新,并为长时间运行的工作流创建更好的用户体验。工作流流式传输的工作原理
当工作流启用流式传输时,CrewAI 会捕获并流式传输工作流中任何 Crew 或 LLM 调用的输出。流会随着执行的进行,以结构化块的形式传递内容、任务上下文和代理信息。启用流式传输
要启用流式传输,请在您的 Flow 类上将stream 属性设置为 True
代码
复制
询问 AI
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class ResearchFlow(Flow):
stream = True # Enable streaming for the entire flow
@start()
def initialize(self):
return {"topic": "AI trends"}
@listen(initialize)
def research_topic(self, data):
researcher = Agent(
role="Research Analyst",
goal="Research topics thoroughly",
backstory="Expert researcher with analytical skills",
)
task = Task(
description="Research {topic} and provide insights",
expected_output="Detailed research findings",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
return crew.kickoff(inputs=data)
同步流式传输
当您在启用流式传输的工作流上调用kickoff() 时,它会返回一个您可以迭代的 FlowStreamingOutput 对象
代码
复制
询问 AI
flow = ResearchFlow()
# Start streaming execution
streaming = flow.kickoff()
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
流式传输块信息
每个块都提供了其在工作流中起源的上下文代码
复制
询问 AI
streaming = flow.kickoff()
for chunk in streaming:
print(f"Agent: {chunk.agent_role}")
print(f"Task: {chunk.task_name}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
访问流式传输属性
FlowStreamingOutput 对象提供了有用的属性和方法
代码
复制
询问 AI
streaming = flow.kickoff()
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result}")
异步流式传输
对于异步应用程序,请使用kickoff_async() 进行异步迭代
代码
复制
询问 AI
import asyncio
async def stream_flow():
flow = ResearchFlow()
# Start async streaming
streaming = await flow.kickoff_async()
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result}")
asyncio.run(stream_flow())
多步工作流的流式传输
流式传输在多个工作流步骤中无缝运行,包括执行多个 Crew 的工作流代码
复制
询问 AI
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class MultiStepFlow(Flow):
stream = True
@start()
def research_phase(self):
"""First crew: Research the topic."""
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information",
backstory="Expert at finding relevant information",
)
task = Task(
description="Research AI developments in healthcare",
expected_output="Research findings on AI in healthcare",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["research"] = result.raw
return result.raw
@listen(research_phase)
def analysis_phase(self, research_data):
"""Second crew: Analyze the research."""
analyst = Agent(
role="Data Analyst",
goal="Analyze information and extract insights",
backstory="Expert at identifying patterns and trends",
)
task = Task(
description="Analyze this research: {research}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"research": research_data})
# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for chunk in streaming:
# Track which flow step is executing
if chunk.task_name != current_step:
current_step = chunk.task_name
print(f"\n\n=== {chunk.task_name} ===\n")
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
实用示例:进度仪表板
以下是构建带流式传输功能的进度仪表板的完整示例代码
复制
询问 AI
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
class ResearchPipeline(Flow):
stream = True
@start()
def gather_data(self):
researcher = Agent(
role="Data Gatherer",
goal="Collect relevant information",
backstory="Skilled at finding quality sources",
)
task = Task(
description="Gather data on renewable energy trends",
expected_output="Collection of relevant data points",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["data"] = result.raw
return result.raw
@listen(gather_data)
def analyze_data(self, data):
analyst = Agent(
role="Data Analyst",
goal="Extract meaningful insights",
backstory="Expert at data analysis",
)
task = Task(
description="Analyze: {data}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"data": data})
async def run_with_dashboard():
flow = ResearchPipeline()
print("="*60)
print("RESEARCH PIPELINE DASHBOARD")
print("="*60)
streaming = await flow.kickoff_async()
current_agent = ""
current_task = ""
chunk_count = 0
async for chunk in streaming:
chunk_count += 1
# Display phase transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
current_agent = chunk.agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
# Display tool usage
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total chunks: {chunk_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
带状态管理器的流式传输
流式传输与工作流状态管理器自然协同工作代码
复制
询问 AI
from pydantic import BaseModel
class AnalysisState(BaseModel):
topic: str = ""
research: str = ""
insights: str = ""
class StatefulStreamingFlow(Flow[AnalysisState]):
stream = True
@start()
def research(self):
# State is available during streaming
topic = self.state.topic
print(f"Researching: {topic}")
researcher = Agent(
role="Researcher",
goal="Research topics thoroughly",
backstory="Expert researcher",
)
task = Task(
description=f"Research {topic}",
expected_output="Research findings",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state.research = result.raw
return result.raw
@listen(research)
def analyze(self, research):
# Access updated state
print(f"Analyzing {len(self.state.research)} chars of research")
analyst = Agent(
role="Analyst",
goal="Extract insights",
backstory="Expert analyst",
)
task = Task(
description="Analyze: {research}",
expected_output="Key insights",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
result = crew.kickoff(inputs={"research": research})
self.state.insights = result.raw
return result.raw
# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")
用例
工作流流式传输对于以下场景特别有价值:- 多阶段工作流:显示研究、分析和综合阶段的进度
- 复杂管道:提供长时间运行数据处理流的可见性
- 交互式应用程序:构建显示中间结果的响应式 UI
- 监控和调试:实时观察工作流执行和 Crew 交互
- 进度跟踪:向用户显示当前正在执行的工作流阶段
- 实时仪表板:为生产工作流创建监控界面
流式传输块类型
与 Crew 流式传输类似,工作流块可以是不同类型的TEXT 块
来自 LLM 响应的标准文本内容代码
复制
询问 AI
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
TOOL_CALL 块
关于工作流中工具调用的信息代码
复制
询问 AI
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nTool: {chunk.tool_call.tool_name}")
print(f"Args: {chunk.tool_call.arguments}")
错误处理
在流式传输期间优雅地处理错误代码
复制
询问 AI
flow = ResearchFlow()
streaming = flow.kickoff()
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
except Exception as e:
print(f"\nError during flow execution: {e}")
if streaming.is_completed:
print("Streaming completed but flow encountered an error")
重要说明
- 流式传输会自动为工作流中使用的任何 Crew 启用 LLM 流式传输
- 您必须迭代所有块,然后才能访问
.result属性 - 流式传输适用于结构化和非结构化工作流状态
- 工作流流式传输捕获工作流中所有 Crew 和 LLM 调用的输出
- 每个块都包含有关哪个代理和任务生成它的上下文
- 流式传输对工作流执行的开销最小
结合工作流可视化
您可以将流式传输与工作流可视化相结合,以提供完整的视图代码
复制
询问 AI
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
