跳转到主要内容

简介

CrewAI 工作流支持流式输出,允许您在工作流执行时接收实时更新。此功能使您能够构建响应式应用程序,以增量方式显示结果、提供实时进度更新,并为长时间运行的工作流创建更好的用户体验。

工作流流式传输的工作原理

当工作流启用流式传输时,CrewAI 会捕获并流式传输工作流中任何 Crew 或 LLM 调用的输出。流会随着执行的进行,以结构化块的形式传递内容、任务上下文和代理信息。

启用流式传输

要启用流式传输,请在您的 Flow 类上将 stream 属性设置为 True
代码
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class ResearchFlow(Flow):
    stream = True  # Enable streaming for the entire flow

    @start()
    def initialize(self):
        return {"topic": "AI trends"}

    @listen(initialize)
    def research_topic(self, data):
        researcher = Agent(
            role="Research Analyst",
            goal="Research topics thoroughly",
            backstory="Expert researcher with analytical skills",
        )

        task = Task(
            description="Research {topic} and provide insights",
            expected_output="Detailed research findings",
            agent=researcher,
        )

        crew = Crew(
            agents=[researcher],
            tasks=[task],
        )

        return crew.kickoff(inputs=data)

同步流式传输

当您在启用流式传输的工作流上调用 kickoff() 时,它会返回一个您可以迭代的 FlowStreamingOutput 对象
代码
flow = ResearchFlow()

# Start streaming execution
streaming = flow.kickoff()

# Iterate over chunks as they arrive
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")

流式传输块信息

每个块都提供了其在工作流中起源的上下文
代码
streaming = flow.kickoff()

for chunk in streaming:
    print(f"Agent: {chunk.agent_role}")
    print(f"Task: {chunk.task_name}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT or TOOL_CALL

访问流式传输属性

FlowStreamingOutput 对象提供了有用的属性和方法
代码
streaming = flow.kickoff()

# Iterate and collect chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result}")

异步流式传输

对于异步应用程序,请使用 kickoff_async() 进行异步迭代
代码
import asyncio

async def stream_flow():
    flow = ResearchFlow()

    # Start async streaming
    streaming = await flow.kickoff_async()

    # Async iteration over chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result}")

asyncio.run(stream_flow())

多步工作流的流式传输

流式传输在多个工作流步骤中无缝运行,包括执行多个 Crew 的工作流
代码
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class MultiStepFlow(Flow):
    stream = True

    @start()
    def research_phase(self):
        """First crew: Research the topic."""
        researcher = Agent(
            role="Research Analyst",
            goal="Gather comprehensive information",
            backstory="Expert at finding relevant information",
        )

        task = Task(
            description="Research AI developments in healthcare",
            expected_output="Research findings on AI in healthcare",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state["research"] = result.raw
        return result.raw

    @listen(research_phase)
    def analysis_phase(self, research_data):
        """Second crew: Analyze the research."""
        analyst = Agent(
            role="Data Analyst",
            goal="Analyze information and extract insights",
            backstory="Expert at identifying patterns and trends",
        )

        task = Task(
            description="Analyze this research: {research}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"research": research_data})


# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()

current_step = ""
for chunk in streaming:
    # Track which flow step is executing
    if chunk.task_name != current_step:
        current_step = chunk.task_name
        print(f"\n\n=== {chunk.task_name} ===\n")

    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal analysis: {result}")

实用示例:进度仪表板

以下是构建带流式传输功能的进度仪表板的完整示例
代码
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

class ResearchPipeline(Flow):
    stream = True

    @start()
    def gather_data(self):
        researcher = Agent(
            role="Data Gatherer",
            goal="Collect relevant information",
            backstory="Skilled at finding quality sources",
        )

        task = Task(
            description="Gather data on renewable energy trends",
            expected_output="Collection of relevant data points",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        self.state["data"] = result.raw
        return result.raw

    @listen(gather_data)
    def analyze_data(self, data):
        analyst = Agent(
            role="Data Analyst",
            goal="Extract meaningful insights",
            backstory="Expert at data analysis",
        )

        task = Task(
            description="Analyze: {data}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"data": data})


async def run_with_dashboard():
    flow = ResearchPipeline()

    print("="*60)
    print("RESEARCH PIPELINE DASHBOARD")
    print("="*60)

    streaming = await flow.kickoff_async()

    current_agent = ""
    current_task = ""
    chunk_count = 0

    async for chunk in streaming:
        chunk_count += 1

        # Display phase transitions
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            current_agent = chunk.agent_role
            print(f"\n\n📋 Phase: {current_task}")
            print(f"👤 Agent: {current_agent}")
            print("-" * 60)

        # Display text output
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Display tool usage
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")

    # Show completion summary
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("PIPELINE COMPLETE")
    print(f"{'='*60}")
    print(f"Total chunks: {chunk_count}")
    print(f"Final output length: {len(str(result))} characters")

asyncio.run(run_with_dashboard())

带状态管理器的流式传输

流式传输与工作流状态管理器自然协同工作
代码
from pydantic import BaseModel

class AnalysisState(BaseModel):
    topic: str = ""
    research: str = ""
    insights: str = ""

class StatefulStreamingFlow(Flow[AnalysisState]):
    stream = True

    @start()
    def research(self):
        # State is available during streaming
        topic = self.state.topic
        print(f"Researching: {topic}")

        researcher = Agent(
            role="Researcher",
            goal="Research topics thoroughly",
            backstory="Expert researcher",
        )

        task = Task(
            description=f"Research {topic}",
            expected_output="Research findings",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state.research = result.raw
        return result.raw

    @listen(research)
    def analyze(self, research):
        # Access updated state
        print(f"Analyzing {len(self.state.research)} chars of research")

        analyst = Agent(
            role="Analyst",
            goal="Extract insights",
            backstory="Expert analyst",
        )

        task = Task(
            description="Analyze: {research}",
            expected_output="Key insights",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        result = crew.kickoff(inputs={"research": research})

        self.state.insights = result.raw
        return result.raw


# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})

for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")

用例

工作流流式传输对于以下场景特别有价值:
  • 多阶段工作流:显示研究、分析和综合阶段的进度
  • 复杂管道:提供长时间运行数据处理流的可见性
  • 交互式应用程序:构建显示中间结果的响应式 UI
  • 监控和调试:实时观察工作流执行和 Crew 交互
  • 进度跟踪:向用户显示当前正在执行的工作流阶段
  • 实时仪表板:为生产工作流创建监控界面

流式传输块类型

与 Crew 流式传输类似,工作流块可以是不同类型的

TEXT 块

来自 LLM 响应的标准文本内容
代码
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)

TOOL_CALL 块

关于工作流中工具调用的信息
代码
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
        print(f"\nTool: {chunk.tool_call.tool_name}")
        print(f"Args: {chunk.tool_call.arguments}")

错误处理

在流式传输期间优雅地处理错误
代码
flow = ResearchFlow()
streaming = flow.kickoff()

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess! Result: {result}")

except Exception as e:
    print(f"\nError during flow execution: {e}")
    if streaming.is_completed:
        print("Streaming completed but flow encountered an error")

重要说明

  • 流式传输会自动为工作流中使用的任何 Crew 启用 LLM 流式传输
  • 您必须迭代所有块,然后才能访问 .result 属性
  • 流式传输适用于结构化和非结构化工作流状态
  • 工作流流式传输捕获工作流中所有 Crew 和 LLM 调用的输出
  • 每个块都包含有关哪个代理和任务生成它的上下文
  • 流式传输对工作流执行的开销最小

结合工作流可视化

您可以将流式传输与工作流可视化相结合,以提供完整的视图
代码
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow")  # Creates HTML visualization

# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
通过利用工作流流式传输,您可以构建复杂、响应迅速的应用程序,为用户提供复杂多阶段工作流的实时可见性,使您的 AI 自动化更加透明和引人入胜。