跳转到主要内容

简介

CrewAI 提供了在 crew 执行期间流式传输实时输出的能力,允许您在结果生成时显示它们,而不是等待整个过程完成。此功能对于构建交互式应用程序、提供用户反馈和监控长时间运行的过程特别有用。

流式传输工作原理

启用流式传输后,CrewAI 会实时捕获 LLM 响应和工具调用,将它们打包成结构化块,其中包括有关哪个任务和代理正在执行的上下文。您可以实时遍历这些块,并在执行完成后访问最终结果。

启用流式传输

要启用流式传输,请在创建 crew 时将 stream 参数设置为 True
代码
from crewai import Agent, Crew, Task

# Create your agents and tasks
researcher = Agent(
    role="Research Analyst",
    goal="Gather comprehensive information on topics",
    backstory="You are an experienced researcher with excellent analytical skills.",
)

task = Task(
    description="Research the latest developments in AI",
    expected_output="A detailed report on recent AI advancements",
    agent=researcher,
)

# Enable streaming
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True  # Enable streaming output
)

同步流式传输

当您在启用了流式传输的 crew 上调用 kickoff() 时,它会返回一个 CrewStreamingOutput 对象,您可以对其进行迭代以接收到达的块
代码
# Start streaming execution
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})

# Iterate over chunks as they arrive
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result.raw}")

流块信息

每个块都提供丰富的执行上下文
代码
streaming = crew.kickoff(inputs={"topic": "AI"})

for chunk in streaming:
    print(f"Task: {chunk.task_name} (index {chunk.task_index})")
    print(f"Agent: {chunk.agent_role}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT or TOOL_CALL
    if chunk.tool_call:
        print(f"Tool: {chunk.tool_call.tool_name}")
        print(f"Arguments: {chunk.tool_call.arguments}")

访问流式传输结果

CrewStreamingOutput 对象提供了一些有用的属性
代码
streaming = crew.kickoff(inputs={"topic": "AI"})

# Iterate and collect chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"All chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result.raw}")

异步流式传输

对于异步应用程序,请使用 kickoff_async() 和异步迭代
代码
import asyncio

async def stream_crew():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    # Start async streaming
    streaming = await crew.kickoff_async(inputs={"topic": "AI"})

    # Async iteration over chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result.raw}")

asyncio.run(stream_crew())

使用 kickoff_for_each 进行流式传输

当使用 kickoff_for_each() 执行多个输入的 crew 时,流式传输的工作方式因您使用同步还是异步而异

同步 kickoff_for_each

使用同步 kickoff_for_each(),您将获得一个 CrewStreamingOutput 对象列表,每个输入一个
代码
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True
)

inputs_list = [
    {"topic": "AI in healthcare"},
    {"topic": "AI in finance"}
]

# Returns list of streaming outputs
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)

# Iterate over each streaming output
for i, streaming in enumerate(streaming_outputs):
    print(f"\n=== Input {i + 1} ===")
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\n\nResult {i + 1}: {result.raw}")

异步 kickoff_for_each_async

使用异步 kickoff_for_each_async(),您将获得一个单独的 CrewStreamingOutput,它会并发地从所有 crew 生成块
代码
import asyncio

async def stream_multiple_crews():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    inputs_list = [
        {"topic": "AI in healthcare"},
        {"topic": "AI in finance"}
    ]

    # Returns single streaming output for all crews
    streaming = await crew.kickoff_for_each_async(inputs=inputs_list)

    # Chunks from all crews arrive as they're generated
    async for chunk in streaming:
        print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)

    # Access all results
    results = streaming.results  # List of CrewOutput objects
    for i, result in enumerate(results):
        print(f"\n\nResult {i + 1}: {result.raw}")

asyncio.run(stream_multiple_crews())

流块类型

块可以是不同的类型,由 chunk_type 字段指示

TEXT 块

来自 LLM 响应的标准文本内容
代码
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)

TOOL_CALL 块

有关正在进行的工具调用的信息
代码
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL:
        print(f"\nCalling tool: {chunk.tool_call.tool_name}")
        print(f"Arguments: {chunk.tool_call.arguments}")

实际示例:使用流式传输构建 UI

这是一个完整的示例,展示了如何使用流式传输构建交互式应用程序
代码
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

async def interactive_research():
    # Create crew with streaming enabled
    researcher = Agent(
        role="Research Analyst",
        goal="Provide detailed analysis on any topic",
        backstory="You are an expert researcher with broad knowledge.",
    )

    task = Task(
        description="Research and analyze: {topic}",
        expected_output="A comprehensive analysis with key insights",
        agent=researcher,
    )

    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True,
        verbose=False
    )

    # Get user input
    topic = input("Enter a topic to research: ")

    print(f"\n{'='*60}")
    print(f"Researching: {topic}")
    print(f"{'='*60}\n")

    # Start streaming execution
    streaming = await crew.kickoff_async(inputs={"topic": topic})

    current_task = ""
    async for chunk in streaming:
        # Show task transitions
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            print(f"\n[{chunk.agent_role}] Working on: {chunk.task_name}")
            print("-" * 60)

        # Display text chunks
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Display tool calls
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Using tool: {chunk.tool_call.tool_name}")

    # Show final result
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("Analysis Complete!")
    print(f"{'='*60}")
    print(f"\nToken Usage: {result.token_usage}")

asyncio.run(interactive_research())

用例

流式传输对于以下情况特别有价值
  • 交互式应用程序:在代理工作时向用户提供实时反馈
  • 长时间运行的任务:显示研究、分析或内容生成的进度
  • 调试和监控:实时观察代理行为和决策
  • 用户体验:通过显示增量结果来减少感知延迟
  • 实时仪表盘:构建显示 crew 执行状态的监控界面

重要说明

  • 流式传输会自动为 crew 中的所有代理启用 LLM 流式传输
  • 您必须在访问 .result 属性之前遍历所有块
  • 对于带流式传输的 kickoff_for_each_async(),请使用 .results(复数)获取所有输出
  • 流式传输增加的开销极小,实际上可以提高感知性能
  • 每个块都包含完整的上下文(任务、代理、块类型),用于丰富的 UI

错误处理

处理流式执行期间的错误
代码
streaming = crew.kickoff(inputs={"topic": "AI"})

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess: {result.raw}")

except Exception as e:
    print(f"\nError during streaming: {e}")
    if streaming.is_completed:
        print("Streaming completed but an error occurred")
通过利用流式传输,您可以使用 CrewAI 构建更具响应性和交互性的应用程序,为用户提供代理执行和结果的实时可见性。