跳转到主要内容

使用 Flow 控制 AI 工作流

CrewAI Flow 代表了 AI 编排的更高层次——将 AI Agent Crew 的协作能力与程序化编程的精确性和灵活性相结合。虽然 Crew 在 Agent 协作方面表现出色,但 Flow 能让你精确控制 AI 系统中不同组件交互的方式和时机。 在本指南中,我们将逐步创建一个强大的 CrewAI Flow,它可以为任何主题生成全面的学习指南。本教程将演示 Flow 如何通过结合常规代码、直接调用 LLM 和基于 Crew 的处理,为你的 AI 工作流提供结构化的、事件驱动的控制。

Flow 的强大之处

Flow 使你能够:
  1. 结合不同的 AI 交互模式 - 使用 Crew 处理复杂的协作任务,使用直接调用 LLM 处理较简单的操作,使用常规代码处理程序逻辑
  2. 构建事件驱动系统 - 定义组件如何响应特定事件和数据变化
  3. 跨组件维护状态 - 在应用程序的不同部分之间共享和转换数据
  4. 与外部系统集成 - 将你的 AI 工作流与数据库、API 和用户界面无缝连接
  5. 创建复杂的执行路径 - 设计条件分支、并行处理和动态工作流

你将构建和学到什么

在本指南结束时,你将:
  1. 创建一个复杂的内容生成系统,该系统结合了用户输入、AI 规划和多 Agent 内容创建
  2. 编排系统不同组件之间的信息流
  3. 实现事件驱动架构,其中每一步都响应前一步的完成
  4. 为更复杂的 AI 应用程序奠定基础,你可以对其进行扩展和自定义
这个指南创建 Flow 演示了可应用于创建更高级应用程序的基本模式,例如:
  • 结合多个专业子系统的交互式 AI 助手
  • 具有 AI 增强转换功能的复杂数据处理管道
  • 与外部服务和 API 集成的自主 Agent
  • 具有“人在环路”(human-in-the-loop)流程的多阶段决策系统
让我们开始构建你的第一个 Flow 吧!

先决条件

开始之前,请确保你已经:
  1. 按照 安装指南 安装了 CrewAI
  2. 按照 LLM 设置指南 在环境中设置了你的 LLM API 密钥
  3. 具备 Python 的基本了解

第 1 步:创建一个新的 CrewAI Flow 项目

首先,让我们使用 CLI 创建一个新的 CrewAI Flow 项目。此命令会搭建一个包含所有必要目录和 Flow 模板文件的项目框架。
crewai create flow guide_creator_flow
cd guide_creator_flow
这将生成一个具有你的 Flow 所需基本结构的项目。
CrewAI Framework Overview

CrewAI 框架概述

第 2 步:了解项目结构

生成的项目具有以下结构。花点时间熟悉它,因为理解这个结构将有助于你将来创建更复杂的 Flow。
guide_creator_flow/
├── .gitignore
├── pyproject.toml
├── README.md
├── .env
├── main.py
├── crews/
│   └── poem_crew/
│       ├── config/
│       │   ├── agents.yaml
│       │   └── tasks.yaml
│       └── poem_crew.py
└── tools/
    └── custom_tool.py
这种结构清晰地划分了 Flow 的不同组件:
  • main.py 文件中的主要 Flow 逻辑
  • crews 目录中的专业 Crew
  • tools 目录中的自定义工具
我们将修改这个结构来创建我们的指南创建 Flow,它将编排生成综合学习指南的整个过程。

第 3 步:添加一个内容编写 Crew

我们的 Flow 将需要一个专门的 Crew 来处理内容创作过程。让我们使用 CrewAI CLI 添加一个内容编写 Crew:
crewai flow add-crew content-crew
此命令会自动为你的 Crew 创建必要的目录和模板文件。内容编写 Crew 将负责编写和审阅我们指南的各个部分,并在我们主应用程序编排的整体 Flow 中工作。

第 4 步:配置内容编写 Crew

现在,让我们修改为内容编写 Crew 生成的文件。我们将设置两个专门的 Agent——一个编写者和一个审阅者——他们将协作创建高质量的指南内容。
  1. 首先,更新 Agent 配置文件以定义我们的内容创作团队: 请记得将 llm 设置为你正在使用的提供商。
# src/guide_creator_flow/crews/content_crew/config/agents.yaml
content_writer:
  role: >
    Educational Content Writer
  goal: >
    Create engaging, informative content that thoroughly explains the assigned topic
    and provides valuable insights to the reader
  backstory: >
    You are a talented educational writer with expertise in creating clear, engaging
    content. You have a gift for explaining complex concepts in accessible language
    and organizing information in a way that helps readers build their understanding.
  llm: provider/model-id  # e.g. openai/gpt-4o, google/gemini-2.0-flash, anthropic/claude...

content_reviewer:
  role: >
    Educational Content Reviewer and Editor
  goal: >
    Ensure content is accurate, comprehensive, well-structured, and maintains
    consistency with previously written sections
  backstory: >
    You are a meticulous editor with years of experience reviewing educational
    content. You have an eye for detail, clarity, and coherence. You excel at
    improving content while maintaining the original author's voice and ensuring
    consistent quality across multiple sections.
  llm: provider/model-id  # e.g. openai/gpt-4o, google/gemini-2.0-flash, anthropic/claude...
这些 Agent 定义确立了塑造我们 AI Agent 进行内容创作方式的专业角色和视角。请注意每个 Agent 如何拥有独特的目的和专长。
  1. 接下来,更新任务配置文件以定义具体的编写和审阅任务:
# src/guide_creator_flow/crews/content_crew/config/tasks.yaml
write_section_task:
  description: >
    Write a comprehensive section on the topic: "{section_title}"

    Section description: {section_description}
    Target audience: {audience_level} level learners

    Your content should:
    1. Begin with a brief introduction to the section topic
    2. Explain all key concepts clearly with examples
    3. Include practical applications or exercises where appropriate
    4. End with a summary of key points
    5. Be approximately 500-800 words in length

    Format your content in Markdown with appropriate headings, lists, and emphasis.

    Previously written sections:
    {previous_sections}

    Make sure your content maintains consistency with previously written sections
    and builds upon concepts that have already been explained.
  expected_output: >
    A well-structured, comprehensive section in Markdown format that thoroughly
    explains the topic and is appropriate for the target audience.
  agent: content_writer

review_section_task:
  description: >
    Review and improve the following section on "{section_title}":

    {draft_content}

    Target audience: {audience_level} level learners

    Previously written sections:
    {previous_sections}

    Your review should:
    1. Fix any grammatical or spelling errors
    2. Improve clarity and readability
    3. Ensure content is comprehensive and accurate
    4. Verify consistency with previously written sections
    5. Enhance the structure and flow
    6. Add any missing key information

    Provide the improved version of the section in Markdown format.
  expected_output: >
    An improved, polished version of the section that maintains the original
    structure but enhances clarity, accuracy, and consistency.
  agent: content_reviewer
  context:
    - write_section_task
这些任务定义为我们的 Agent 提供了详细的指令,确保他们生成的内容符合我们的质量标准。请注意,审阅任务中的 context 参数创建了一个工作流,使得审阅者可以访问编写者的输出。
  1. 现在,更新 Crew 实现文件以定义我们的 Agent 和任务如何协同工作:
# src/guide_creator_flow/crews/content_crew/content_crew.py
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List

@CrewBase
class ContentCrew():
    """Content writing crew"""

    agents: List[BaseAgent]
    tasks: List[Task]

    @agent
    def content_writer(self) -> Agent:
        return Agent(
            config=self.agents_config['content_writer'], # type: ignore[index]
            verbose=True
        )

    @agent
    def content_reviewer(self) -> Agent:
        return Agent(
            config=self.agents_config['content_reviewer'], # type: ignore[index]
            verbose=True
        )

    @task
    def write_section_task(self) -> Task:
        return Task(
            config=self.tasks_config['write_section_task'] # type: ignore[index]
        )

    @task
    def review_section_task(self) -> Task:
        return Task(
            config=self.tasks_config['review_section_task'], # type: ignore[index]
            context=[self.write_section_task()]
        )

    @crew
    def crew(self) -> Crew:
        """Creates the content writing crew"""
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True,
        )
这个 Crew 定义确立了我们 Agent 和任务之间的关系,建立了一个顺序流程,即内容编写者创建草稿,然后审阅者进行改进。虽然这个 Crew 可以独立运作,但在我们的 Flow 中,它将作为更大系统的一部分被编排。

第 5 步:创建 Flow

现在到了激动人心的部分——创建将编排整个指南创建过程的 Flow。在这里,我们将把常规的 Python 代码、直接调用 LLM 和我们的内容创作 Crew 结合成一个有凝聚力的系统。 我们的 Flow 将会:
  1. 获取用户输入的主题和受众水平
  2. 直接调用 LLM 创建一个结构化的指南大纲
  3. 使用内容编写 Crew 顺序处理每个部分
  4. 将所有内容组合成一份最终的综合文档
让我们在 main.py 文件中创建我们的 Flow:
#!/usr/bin/env python
import json
import os
from typing import List, Dict
from pydantic import BaseModel, Field
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
from guide_creator_flow.crews.content_crew.content_crew import ContentCrew

# Define our models for structured data
class Section(BaseModel):
    title: str = Field(description="Title of the section")
    description: str = Field(description="Brief description of what the section should cover")

class GuideOutline(BaseModel):
    title: str = Field(description="Title of the guide")
    introduction: str = Field(description="Introduction to the topic")
    target_audience: str = Field(description="Description of the target audience")
    sections: List[Section] = Field(description="List of sections in the guide")
    conclusion: str = Field(description="Conclusion or summary of the guide")

# Define our flow state
class GuideCreatorState(BaseModel):
    topic: str = ""
    audience_level: str = ""
    guide_outline: GuideOutline = None
    sections_content: Dict[str, str] = {}

class GuideCreatorFlow(Flow[GuideCreatorState]):
    """Flow for creating a comprehensive guide on any topic"""

    @start()
    def get_user_input(self):
        """Get input from the user about the guide topic and audience"""
        print("\n=== Create Your Comprehensive Guide ===\n")

        # Get user input
        self.state.topic = input("What topic would you like to create a guide for? ")

        # Get audience level with validation
        while True:
            audience = input("Who is your target audience? (beginner/intermediate/advanced) ").lower()
            if audience in ["beginner", "intermediate", "advanced"]:
                self.state.audience_level = audience
                break
            print("Please enter 'beginner', 'intermediate', or 'advanced'")

        print(f"\nCreating a guide on {self.state.topic} for {self.state.audience_level} audience...\n")
        return self.state

    @listen(get_user_input)
    def create_guide_outline(self, state):
        """Create a structured outline for the guide using a direct LLM call"""
        print("Creating guide outline...")

        # Initialize the LLM
        llm = LLM(model="openai/gpt-4o-mini", response_format=GuideOutline)

        # Create the messages for the outline
        messages = [
            {"role": "system", "content": "You are a helpful assistant designed to output JSON."},
            {"role": "user", "content": f"""
            Create a detailed outline for a comprehensive guide on "{state.topic}" for {state.audience_level} level learners.

            The outline should include:
            1. A compelling title for the guide
            2. An introduction to the topic
            3. 4-6 main sections that cover the most important aspects of the topic
            4. A conclusion or summary

            For each section, provide a clear title and a brief description of what it should cover.
            """}
        ]

        # Make the LLM call with JSON response format
        response = llm.call(messages=messages)

        # Parse the JSON response
        outline_dict = json.loads(response)
        self.state.guide_outline = GuideOutline(**outline_dict)

        # Ensure output directory exists before saving
        os.makedirs("output", exist_ok=True)

        # Save the outline to a file
        with open("output/guide_outline.json", "w") as f:
            json.dump(outline_dict, f, indent=2)

        print(f"Guide outline created with {len(self.state.guide_outline.sections)} sections")
        return self.state.guide_outline

    @listen(create_guide_outline)
    def write_and_compile_guide(self, outline):
        """Write all sections and compile the guide"""
        print("Writing guide sections and compiling...")
        completed_sections = []

        # Process sections one by one to maintain context flow
        for section in outline.sections:
            print(f"Processing section: {section.title}")

            # Build context from previous sections
            previous_sections_text = ""
            if completed_sections:
                previous_sections_text = "# Previously Written Sections\n\n"
                for title in completed_sections:
                    previous_sections_text += f"## {title}\n\n"
                    previous_sections_text += self.state.sections_content.get(title, "") + "\n\n"
            else:
                previous_sections_text = "No previous sections written yet."

            # Run the content crew for this section
            result = ContentCrew().crew().kickoff(inputs={
                "section_title": section.title,
                "section_description": section.description,
                "audience_level": self.state.audience_level,
                "previous_sections": previous_sections_text,
                "draft_content": ""
            })

            # Store the content
            self.state.sections_content[section.title] = result.raw
            completed_sections.append(section.title)
            print(f"Section completed: {section.title}")

        # Compile the final guide
        guide_content = f"# {outline.title}\n\n"
        guide_content += f"## Introduction\n\n{outline.introduction}\n\n"

        # Add each section in order
        for section in outline.sections:
            section_content = self.state.sections_content.get(section.title, "")
            guide_content += f"\n\n{section_content}\n\n"

        # Add conclusion
        guide_content += f"## Conclusion\n\n{outline.conclusion}\n\n"

        # Save the guide
        with open("output/complete_guide.md", "w") as f:
            f.write(guide_content)

        print("\nComplete guide compiled and saved to output/complete_guide.md")
        return "Guide creation completed successfully"

def kickoff():
    """Run the guide creator flow"""
    GuideCreatorFlow().kickoff()
    print("\n=== Flow Complete ===")
    print("Your comprehensive guide is ready in the output directory.")
    print("Open output/complete_guide.md to view it.")

def plot():
    """Generate a visualization of the flow"""
    flow = GuideCreatorFlow()
    flow.plot("guide_creator_flow")
    print("Flow visualization saved to guide_creator_flow.html")

if __name__ == "__main__":
    kickoff()
让我们分析一下这个 Flow 中发生了什么:
  1. 我们为结构化数据定义了 Pydantic 模型,确保了类型安全和清晰的数据表示
  2. 我们创建了一个状态类,用于在 Flow 的不同步骤之间维护数据
  3. 我们实现了三个主要的 Flow 步骤:
    • 使用 @start() 装饰器获取用户输入
    • 直接调用 LLM 创建指南大纲
    • 使用我们的内容 Crew 处理各个部分
  4. 我们使用 @listen() 装饰器在步骤之间建立事件驱动的关系
这就是 Flow 的强大之处——将不同类型的处理(用户交互、直接 LLM 调用、基于 Crew 的任务)组合成一个连贯的、事件驱动的系统。

第 6 步:设置你的环境变量

在你的项目根目录下创建一个 .env 文件,并填入你的 API 密钥。有关配置提供商的详细信息,请参阅 LLM 设置指南
.env
OPENAI_API_KEY=your_openai_api_key
# or
GEMINI_API_KEY=your_gemini_api_key
# or
ANTHROPIC_API_KEY=your_anthropic_api_key

第 7 步:安装依赖项

安装所需的依赖项:
crewai install

第 8 步:运行你的 Flow

现在是时候看看你的 Flow 的实际效果了!使用 CrewAI CLI 运行它:
crewai flow kickoff
当你运行此命令时,你会看到你的 Flow 活跃起来:
  1. 它会提示你输入主题和受众水平
  2. 它将为你的指南创建一个结构化大纲
  3. 它将处理每个部分,内容编写者和审阅者将对每个部分进行协作
  4. 最后,它会将所有内容编译成一份综合指南
这展示了 Flow 编排涉及多个组件(包括 AI 和非 AI 组件)的复杂流程的能力。

第 9 步:可视化你的 Flow

Flow 的一个强大功能是能够将其结构可视化:
crewai flow plot
这将创建一个 HTML 文件,显示你的 Flow 结构,包括不同步骤之间的关系以及它们之间流动的数据。这种可视化对于理解和调试复杂的 Flow 非常有价值。

第 10 步:检查输出

Flow 完成后,你会在 output 目录中找到两个文件:
  1. guide_outline.json:包含指南的结构化大纲
  2. complete_guide.md:包含所有部分的综合指南
花点时间查看这些文件,欣赏你所构建的成果——一个结合了用户输入、直接 AI 交互和协作 Agent 工作,以生成复杂、高质量输出的系统。

无限可能:超越你的第一个 Flow

你在本指南中学到的知识为创建更复杂的 AI 系统奠定了基础。以下是一些可以扩展这个基本 Flow 的方法:

增强用户交互

你可以创建更具交互性的 Flow,例如:
  • 用于输入和输出的 Web 界面
  • 实时进度更新
  • 交互式反馈和优化循环
  • 多阶段用户交互

添加更多处理步骤

你可以通过增加额外的步骤来扩展你的 Flow,例如:
  • 在大纲创建前进行研究
  • 为插图生成图片
  • 为技术指南生成代码片段
  • 最终的质量保证和事实核查

创建更复杂的 Flow

你可以实现更复杂的 Flow 模式:
  • 根据用户偏好或内容类型进行条件分支
  • 并行处理独立的部分
  • 带反馈的迭代优化循环
  • 与外部 API 和服务集成

应用于不同领域

相同的模式可以应用于创建以下领域的 Flow:
  • 交互式故事叙述:根据用户输入创建个性化故事
  • 商业智能:处理数据、生成见解并创建报告
  • 产品开发:促进构思、设计和规划
  • 教育系统:创建个性化学习体验

展示的关键特性

这个指南创建 Flow 演示了 CrewAI 的几个强大功能:
  1. 用户交互:Flow 直接从用户那里收集输入
  2. 直接调用 LLM:使用 LLM 类进行高效、单一目的的 AI 交互
  3. 使用 Pydantic 实现结构化数据:使用 Pydantic 模型确保类型安全
  4. 带上下文的顺序处理:按顺序编写章节,并提供前面章节作为上下文
  5. 多 Agent Crew:利用专门的 Agent(编写者和审阅者)进行内容创作
  6. 状态管理:在流程的不同步骤之间维护状态
  7. 事件驱动架构:使用 @listen 装饰器响应事件

理解 Flow 结构

让我们分解 Flow 的关键组件,以帮助你了解如何构建自己的 Flow:

1. 直接调用 LLM

当你需要简单、结构化的响应时,Flow 允许你直接调用语言模型:
llm = LLM(
    model="model-id-here",  # gpt-4o, gemini-2.0-flash, anthropic/claude...
    response_format=GuideOutline
)
response = llm.call(messages=messages)
当你需要一个特定的、结构化的输出时,这比使用 Crew 更高效。

2. 事件驱动架构

Flow 使用装饰器在组件之间建立关系:
@start()
def get_user_input(self):
    # First step in the flow
    # ...

@listen(get_user_input)
def create_guide_outline(self, state):
    # This runs when get_user_input completes
    # ...
这为你的应用程序创建了一个清晰、声明式的结构。

3. 状态管理

Flow 在步骤之间维护状态,使得共享数据变得容易:
class GuideCreatorState(BaseModel):
    topic: str = ""
    audience_level: str = ""
    guide_outline: GuideOutline = None
    sections_content: Dict[str, str] = {}
这提供了一种类型安全的方式来跟踪和转换整个 Flow 中的数据。

4. Crew 集成

Flow 可以与 Crew 无缝集成,以处理复杂的协作任务:
result = ContentCrew().crew().kickoff(inputs={
    "section_title": section.title,
    # ...
})
这使你能够为应用程序的每个部分使用正确的工具——对于简单任务使用直接调用 LLM,对于复杂协作使用 Crew。

后续步骤

现在你已经构建了你的第一个 Flow,你可以:
  1. 尝试更复杂的 Flow 结构和模式
  2. 尝试使用 @router() 在你的 Flow 中创建条件分支
  3. 探索 and_or_ 函数以实现更复杂的并行执行
  4. 将你的 Flow 连接到外部 API、数据库或用户界面
  5. 在单个 Flow 中组合多个专业 Crew
恭喜!你已成功构建了你的第一个 CrewAI Flow,它结合了常规代码、直接 LLM 调用和基于 Crew 的处理来创建一份综合指南。这些基础技能使你能够创建日益复杂的 AI 应用程序,通过结合程序控制和协作智能来解决复杂的多阶段问题。