跳转到主要内容

概述

CrewAI Flows 是一项强大的功能,旨在简化 AI 工作流的创建和管理。Flows 允许开发人员高效地组合和协调编码任务和团队 (Crews),为构建复杂的 AI 自动化提供了强大的框架。 Flows 允许您创建结构化的、事件驱动的工作流。它们提供了一种无缝的方式来连接多个任务、管理状态以及控制 AI 应用程序中的执行流程。借助 Flows,您可以轻松设计和实现多步骤流程,充分利用 CrewAI 的全部潜力。
  1. 简化的工作流创建:轻松将多个团队 (Crews) 和任务串联起来,创建复杂的 AI 工作流。
  2. 状态管理:Flows 使得在工作流中不同任务之间管理和共享状态变得异常简单。
  3. 事件驱动架构:建立在事件驱动模型之上,实现动态和响应式的工作流。
  4. 灵活的控制流:在工作流中实现条件逻辑、循环和分支。

开始入门

让我们创建一个简单的 Flow,您将使用 OpenAI 在一个任务中生成一个随机城市,然后在另一个任务中使用该城市生成一个有趣的事实。
代码

from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion


class ExampleFlow(Flow):
    model = "gpt-4o-mini"

    @start()
    def generate_city(self):
        print("Starting flow")
        # Each flow state automatically gets a unique ID
        print(f"Flow State ID: {self.state['id']}")

        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": "Return the name of a random city in the world.",
                },
            ],
        )

        random_city = response["choices"][0]["message"]["content"]
        # Store the city in our state
        self.state["city"] = random_city
        print(f"Random City: {random_city}")

        return random_city

    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": f"Tell me a fun fact about {random_city}",
                },
            ],
        )

        fun_fact = response["choices"][0]["message"]["content"]
        # Store the fun fact in our state
        self.state["fun_fact"] = fun_fact
        return fun_fact



flow = ExampleFlow()
flow.plot()
result = flow.kickoff()

print(f"Generated fun fact: {result}")
工作流可视化图片 在上面的示例中,我们创建了一个简单的 Flow,它使用 OpenAI 生成一个随机城市,然后生成关于该城市的一个有趣事实。该 Flow 由两个任务组成:generate_citygenerate_fun_factgenerate_city 任务是 Flow 的起点,而 generate_fun_fact 任务监听 generate_city 任务的输出。 每个 Flow 实例都会在其状态中自动接收一个唯一标识符 (UUID),这有助于跟踪和管理 Flow 的执行。状态还可以存储其他数据(如生成的城市和有趣事实),这些数据在 Flow 的整个执行过程中保持不变。 当您运行该 Flow 时,它将:
  1. 为 Flow 状态生成一个唯一 ID
  2. 生成一个随机城市并将其存储在状态中
  3. 生成关于该城市的一个有趣事实并将其存储在状态中
  4. 将结果打印到控制台
状态的唯一 ID 和存储的数据对于跟踪 Flow 执行和在任务之间维持上下文非常有用。 注意:确保您已设置好 .env 文件来存储您的 OPENAI_API_KEY。此密钥是向 OpenAI API 发出请求进行身份验证所必需的。

@start()

@start() 装饰器标记一个 Flow 的入口点。您可以
  • 声明多个无条件的启动点:@start()
  • 根据先前的方法或路由器标签来控制启动:@start("method_or_label")
  • 提供一个可调用的条件来控制启动的时机
当 Flow 开始或恢复时,所有满足条件的 @start() 方法都将执行(通常是并行执行)。

@listen()

@listen() 装饰器用于将一个方法标记为 Flow 中另一个任务输出的监听器。当指定的任务发出输出时,用 @listen() 装饰的方法将被执行。该方法可以访问它所监听任务的输出作为参数。

用法

@listen() 装饰器有多种使用方式
  1. 按名称监听方法:您可以将要监听的方法名称作为字符串传递。当该方法完成时,监听器方法将被触发。
    代码
    @listen("generate_city")
    def generate_fun_fact(self, random_city):
        # Implementation
    
  2. 直接监听方法:您可以直接传递方法本身。当该方法完成时,监听器方法将被触发。
    代码
    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        # Implementation
    

工作流输出

访问和处理 Flow 的输出对于将您的 AI 工作流集成到更大的应用程序或系统中至关重要。CrewAI Flows 提供了直接的机制来检索最终输出、访问中间结果以及管理 Flow 的整体状态。

检索最终输出

当您运行一个 Flow 时,最终输出由最后一个完成的方法决定。kickoff() 方法返回这个最终方法的输出。 以下是您如何访问最终输出的方法:
from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        return "Output from first_method"

    @listen(first_method)
    def second_method(self, first_output):
        return f"Second method received: {first_output}"


flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()

print("---- Final Output ----")
print(final_output)
工作流可视化图片 在此示例中,second_method 是最后一个完成的方法,因此其输出将是 Flow 的最终输出。kickoff() 方法将返回最终输出,然后将其打印到控制台。plot() 方法将生成 HTML 文件,帮助您理解工作流。

访问和更新状态

除了检索最终输出,您还可以在 Flow 内部访问和更新状态。状态可用于在 Flow 中的不同方法之间存储和共享数据。在 Flow 运行后,您可以访问状态以检索在执行期间添加或更新的任何信息。 以下是更新和访问状态的示例:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
工作流可视化图片 在此示例中,状态由 first_methodsecond_method 共同更新。在 Flow 运行后,您可以访问最终状态以查看这些方法所做的更新。 通过确保返回最终方法的输出并提供对状态的访问,CrewAI Flows 使得将 AI 工作流的结果轻松集成到更大的应用程序或系统中,同时在 Flow 的整个执行过程中维护和访问状态。

工作流状态管理

有效管理状态对于构建可靠且可维护的 AI 工作流至关重要。CrewAI Flows 为非结构化和结构化状态管理提供了强大的机制,允许开发人员选择最适合其应用程序需求的方法。

非结构化状态管理

在非结构化状态管理中,所有状态都存储在 Flow 类的 state 属性中。这种方法提供了灵活性,使开发人员能够动态添加或修改状态属性,而无需定义严格的模式。即使使用非结构化状态,CrewAI Flows 也会自动为每个状态实例生成和维护一个唯一标识符 (UUID)。
代码
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state['counter'] = 0
        self.state['message'] = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
工作流可视化图片 注意: id 字段是自动生成并在整个 Flow 执行过程中保留的。您无需手动管理或设置它,即使在用新数据更新状态时它也会被维护。 关键点:
  • 灵活性: 您可以动态地向 self.state 添加属性,而无需预定义的约束。
  • 简易性: 非常适合状态结构简单或变化显著的直接工作流。

结构化状态管理

结构化状态管理利用预定义的模式来确保整个工作流的一致性和类型安全。通过使用像 Pydantic 的 BaseModel 这样的模型,开发人员可以定义状态的确切形态,从而在开发环境中实现更好的验证和自动补全。 CrewAI Flows 中的每个状态都会自动接收一个唯一标识符 (UUID),以帮助跟踪和管理状态实例。此 ID 由 Flow 系统自动生成和管理。
代码
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    # Note: 'id' field is automatically added to all states
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        # Access the auto-generated ID if needed
        print(f"State ID: {self.state.id}")
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
工作流可视化图片 关键点:
  • 已定义的模式: ExampleState 清晰地勾勒出状态结构,增强了代码的可读性和可维护性。
  • 类型安全: 利用 Pydantic 确保状态属性遵守指定的类型,减少运行时错误。
  • 自动补全: IDE 可以基于定义的状态模型提供更好的自动补全和错误检查。

在非结构化和结构化状态管理之间选择

  • 使用非结构化状态管理,当
    • 工作流的状态简单或高度动态。
    • 灵活性优先于严格的状态定义。
    • 需要快速原型开发,而无需定义模式的开销。
  • 使用结构化状态管理,当
    • 工作流需要一个定义良好且一致的状态结构。
    • 类型安全和验证对您的应用程序的可靠性很重要。
    • 您希望利用 IDE 功能(如自动补全和类型检查)来获得更好的开发体验。
通过提供非结构化和结构化状态管理选项,CrewAI Flows 使开发人员能够构建既灵活又健壮的 AI 工作流,满足广泛的应用程序需求。

工作流持久化

@persist 装饰器在 CrewAI Flows 中启用了自动状态持久化,允许您在重启或不同的工作流执行之间保持流状态。此装饰器可以应用于类级别或方法级别,为您管理状态持久化提供了灵活性。

类级别的持久化

当应用于类级别时,@persist 装饰器会自动持久化所有流方法的状态
@persist  # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
    @start()
    def initialize_flow(self):
        # This method will automatically have its state persisted
        self.state.counter = 1
        print("Initialized flow. State ID:", self.state.id)

    @listen(initialize_flow)
    def next_step(self):
        # The state (including self.state.id) is automatically reloaded
        self.state.counter += 1
        print("Flow state is persisted. Counter:", self.state.counter)

方法级别的持久化

为了更精细的控制,您可以将 @persist 应用于特定的方法
class AnotherFlow(Flow[dict]):
    @persist  # Persists only this method's state
    @start()
    def begin(self):
        if "runs" not in self.state:
            self.state["runs"] = 0
        self.state["runs"] += 1
        print("Method-level persisted runs:", self.state["runs"])

工作原理

  1. 唯一状态标识
    • 每个流状态自动接收一个唯一的 UUID
    • ID 在状态更新和方法调用中保持不变
    • 支持结构化(Pydantic BaseModel)和非结构化(字典)状态
  2. 默认 SQLite 后端
    • SQLiteFlowPersistence 是默认的存储后端
    • 状态会自动保存到本地 SQLite 数据库中
    • 强大的错误处理确保在数据库操作失败时提供清晰的消息
  3. 错误处理
    • 数据库操作的全面错误消息
    • 保存和加载期间的自动状态验证
    • 当持久化操作遇到问题时提供清晰的反馈

重要注意事项

  • 状态类型:支持结构化(Pydantic BaseModel)和非结构化(字典)状态
  • 自动ID:如果不存在,会自动添加 id 字段
  • 状态恢复:失败或重启的流可以自动重新加载其先前的状态
  • 自定义实现:您可以为专门的存储需求提供自己的 FlowPersistence 实现

技术优势

  1. 通过低级访问实现精确控制
    • 直接访问持久化操作以满足高级用例
    • 通过方法级持久化装饰器实现精细控制
    • 内置状态检查和调试功能
    • 对状态变化和持久化操作的完全可见性
  2. 增强的可靠性
    • 系统故障或重启后自动恢复状态
    • 基于事务的状态更新,确保数据完整性
    • 全面的错误处理和清晰的错误消息
    • 在状态保存和加载操作期间进行强大的验证
  3. 可扩展架构
    • 通过 FlowPersistence 接口自定义持久化后端
    • 支持 SQLite 之外的专用存储解决方案
    • 兼容结构化 (Pydantic) 和非结构化 (dict) 状态
    • 与现有 CrewAI 流模式无缝集成
持久化系统的架构强调技术精确性和定制选项,使开发人员能够在受益于内置可靠性功能的同时,保持对状态管理的完全控制。

流程控制

条件逻辑: or

Flows 中的 or_ 函数允许您监听多个方法,并在任何指定的方法发出输出时触发监听器方法。
from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):

    @start()
    def start_method(self):
        return "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        return "Hello from the second method"

    @listen(or_(start_method, second_method))
    def logger(self, result):
        print(f"Logger: {result}")



flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
工作流可视化图片 当您运行此 Flow 时,logger 方法将由 start_methodsecond_method 的输出触发。or_ 函数用于监听多个方法,并在任何指定的方法发出输出时触发监听器方法。

条件逻辑:and

Flows 中的 and_ 函数允许您监听多个方法,并且仅在所有指定方法都发出输出时才触发监听器方法。
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.plot()
flow.kickoff()
工作流可视化图片 当您运行此 Flow 时,logger 方法仅在 start_methodsecond_method 都发出输出时才会被触发。and_ 函数用于监听多个方法,并且仅在所有指定方法都发出输出时才触发监听器方法。

路由器

Flows 中的 @router() 装饰器允许您根据方法的输出定义条件路由逻辑。您可以根据方法的输出指定不同的路由,从而动态地控制执行流程。
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
工作流可视化图片 在上面的示例中,start_method 生成一个随机的布尔值并将其设置在状态中。second_method 使用 @router() 装饰器根据布尔值定义条件路由逻辑。如果布尔值为 True,该方法返回 "success",如果为 False,则返回 "failed"third_methodfourth_method 监听 second_method 的输出,并根据返回的值执行。 当您运行此 Flow 时,输出将根据 start_method 生成的随机布尔值而变化。

向工作流中添加智能体

智能体可以无缝集成到您的工作流中,当您需要更简单、更专注的任务执行时,它们提供了一个比完整团队更轻量级的替代方案。以下是如何在工作流中使用智能体执行市场研究的示例。
import asyncio
from typing import Any, Dict, List

from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field

from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start


# Define a structured output format
class MarketAnalysis(BaseModel):
    key_trends: List[str] = Field(description="List of identified market trends")
    market_size: str = Field(description="Estimated market size")
    competitors: List[str] = Field(description="Major competitors in the space")


# Define flow state
class MarketResearchState(BaseModel):
    product: str = ""
    analysis: MarketAnalysis | None = None


# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
    @start()
    def initialize_research(self) -> Dict[str, Any]:
        print(f"Starting market research for {self.state.product}")
        return {"product": self.state.product}

    @listen(initialize_research)
    async def analyze_market(self) -> Dict[str, Any]:
        # Create an Agent for market research
        analyst = Agent(
            role="Market Research Analyst",
            goal=f"Analyze the market for {self.state.product}",
            backstory="You are an experienced market analyst with expertise in "
            "identifying market trends and opportunities.",
            tools=[SerperDevTool()],
            verbose=True,
        )

        # Define the research query
        query = f"""
        Research the market for {self.state.product}. Include:
        1. Key market trends
        2. Market size
        3. Major competitors

        Format your response according to the specified structure.
        """

        # Execute the analysis with structured output format
        result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
        if result.pydantic:
            print("result", result.pydantic)
        else:
            print("result", result)

        # Return the analysis to update the state
        return {"analysis": result.pydantic}

    @listen(analyze_market)
    def present_results(self, analysis) -> None:
        print("\nMarket Analysis Results")
        print("=====================")

        if isinstance(analysis, dict):
            # If we got a dict with 'analysis' key, extract the actual analysis object
            market_analysis = analysis.get("analysis")
        else:
            market_analysis = analysis

        if market_analysis and isinstance(market_analysis, MarketAnalysis):
            print("\nKey Market Trends:")
            for trend in market_analysis.key_trends:
                print(f"- {trend}")

            print(f"\nMarket Size: {market_analysis.market_size}")

            print("\nMajor Competitors:")
            for competitor in market_analysis.competitors:
                print(f"- {competitor}")
        else:
            print("No structured analysis data available.")
            print("Raw analysis:", analysis)


# Usage example
async def run_flow():
    flow = MarketResearchFlow()
    flow.plot("MarketResearchFlowPlot")
    result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
    return result


# Run the flow
if __name__ == "__main__":
    asyncio.run(run_flow())
工作流可视化图片 这个例子展示了在工作流中使用智能体的几个关键特性:
  1. 结构化输出:使用 Pydantic 模型定义预期的输出格式(MarketAnalysis)确保了整个工作流的类型安全和结构化数据。
  2. 状态管理:工作流状态(MarketResearchState)在步骤之间维护上下文,并存储输入和输出。
  3. 工具集成:智能体可以使用工具(如 WebsiteSearchTool)来增强其能力。

将团队(Crews)添加到工作流中

在 CrewAI 中创建一个包含多个团队(Crews)的工作流非常简单。 您可以通过运行以下命令生成一个新的 CrewAI 项目,该项目包含了创建多团队工作流所需的所有脚手架:
crewai create flow name_of_flow
此命令将生成一个具有必要文件夹结构的新 CrewAI 项目。生成的项目包括一个名为 poem_crew 的预构建团队,该团队已经可以工作。您可以将此团队用作模板,通过复制、粘贴和编辑来创建其他团队。

文件夹结构

运行 `crewai create flow name_of_flow` 命令后,您将看到类似于以下的文件夹结构
目录/文件描述
name_of_flow/工作流的根目录。
├── crews/包含特定团队的目录。
│ └── poem_crew/“poem_crew” 团队的目录,包含其配置和脚本。
│ ├── config/“poem_crew” 团队的配置文件目录。
│ │ ├── agents.yaml定义 “poem_crew” 团队智能体的 YAML 文件。
│ │ └── tasks.yaml定义 “poem_crew” 团队任务的 YAML 文件。
│ ├── poem_crew.py“poem_crew” 团队功能的脚本。
├── tools/工作流中使用的附加工具目录。
│ └── custom_tool.py自定义工具的实现。
├── main.py运行工作流的主脚本。
├── README.md项目描述和说明。
├── pyproject.toml项目依赖和设置的配置文件。
└── .gitignore指定在版本控制中忽略的文件和目录。

构建您的团队

crews 文件夹中,您可以定义多个团队。每个团队都有自己的文件夹,其中包含配置文件和团队定义文件。例如,poem_crew 文件夹包含
  • config/agents.yaml:定义团队的智能体。
  • config/tasks.yaml:定义团队的任务。
  • poem_crew.py:包含团队定义,包括智能体、任务和团队本身。
您可以复制、粘贴和编辑 poem_crew 来创建其他团队。

在 `main.py` 中连接团队

`main.py` 文件是您创建工作流并将团队连接在一起的地方。您可以使用 `Flow` 类以及装饰器 `@start` 和 `@listen` 来定义工作流的执行流程。 以下是如何在 `main.py` 文件中连接 `poem_crew` 的示例:
代码
#!/usr/bin/env python
from random import randint

from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew

class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""

class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})

        print("Poem generated", result.raw)
        self.state.poem = result.raw

    @listen(generate_poem)
    def save_poem(self):
        print("Saving poem")
        with open("poem.txt", "w") as f:
            f.write(self.state.poem)

def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot("PoemFlowPlot")

if __name__ == "__main__":
    kickoff()
    plot()
在这个示例中,`PoemFlow` 类定义了一个工作流,它生成一个句子计数,使用 `PoemCrew` 生成一首诗,然后将诗保存到文件中。通过调用 `kickoff()` 方法启动工作流。PoemFlowPlot 将由 `plot()` 方法生成。 工作流可视化图片

运行工作流

(可选)在运行工作流之前,您可以通过运行以下命令来安装依赖项
crewai install
安装完所有依赖项后,您需要通过运行以下命令来激活虚拟环境
source .venv/bin/activate
激活虚拟环境后,您可以通过执行以下命令之一来运行工作流
crewai flow kickoff
uv run kickoff
工作流将执行,您应该会在控制台中看到输出。

绘制工作流图

可视化您的 AI 工作流可以为您提供有关工作流结构和执行路径的宝贵见解。CrewAI 提供了一个强大的可视化工具,允许您生成工作流的交互式图表,从而更容易理解和优化您的 AI 工作流。

什么是图表?

CrewAI 中的图表是您 AI 工作流的图形表示。它们显示了各种任务、它们的连接以及它们之间的数据流。这种可视化有助于理解操作顺序、识别瓶颈,并确保工作流逻辑符合您的预期。

如何生成图表

CrewAI 提供了两种便捷的方法来生成您的工作流图表

选项 1:使用 `plot()` 方法

如果您直接使用一个工作流实例,可以通过在您的工作流对象上调用 `plot()` 方法来生成一个图表。该方法将创建一个包含您工作流交互式图表的 HTML 文件。
代码
# Assuming you have a flow instance
flow.plot("my_flow_plot")
这将在您当前目录下生成一个名为 `my_flow_plot.html` 的文件。您可以在网页浏览器中打开此文件以查看交互式图表。

选项 2:使用命令行

如果您在结构化的 CrewAI 项目中工作,可以使用命令行生成图表。这对于您希望可视化整个工作流设置的较大型项目尤其有用。
crewai flow plot
此命令将生成一个包含您的工作流图表的 HTML 文件,类似于 `plot()` 方法。该文件将保存在您的项目目录中,您可以在网页浏览器中打开它来探索工作流。

理解图表

生成的图表将显示代表您工作流中任务的节点,以及指示执行流程的有向边。该图表是交互式的,允许您放大和缩小,并将鼠标悬停在节点上以查看其他详细信息。 通过可视化您的工作流,您可以更清晰地了解工作流的结构,从而更容易地调试、优化并向他人传达您的 AI 流程。

结论

绘制您的工作流图是 CrewAI 的一项强大功能,它增强了您设计和管理复杂 AI 工作流的能力。无论您选择使用 `plot()` 方法还是命令行,生成图表都将为您提供工作流的可视化表示,有助于开发和演示。

后续步骤

如果您有兴趣探索更多的工作流示例,我们在我们的示例仓库中有各种推荐。这里有四个具体的工作流示例,每个都展示了独特的用例,以帮助您将当前的问题类型与特定示例匹配起来
  1. 邮件自动回复工作流:此示例演示了一个无限循环,其中后台作业持续运行以自动化邮件回复。对于需要重复执行而无需手动干预的任务,这是一个很好的用例。查看示例
  2. 潜在客户评分工作流:此工作流展示了如何添加人工参与反馈并使用路由器处理不同的条件分支。这是一个很好的例子,说明了如何将动态决策和人工监督融入您的工作流中。查看示例
  3. 写书工作流:此示例擅长将多个团队链接在一起,其中一个团队的输出被另一个团队使用。具体来说,一个团队概述整本书,另一个团队根据大纲生成章节。最终,所有内容连接起来,生成一本完整的书。此工作流非常适合需要不同任务之间协调的复杂多步骤流程。查看示例
  4. 会议助手工作流:此工作流演示了如何广播一个事件以触发多个后续操作。例如,会议结束后,工作流可以更新 Trello 板、发送 Slack 消息并保存结果。这是一个处理单一事件多个结果的好例子,非常适合全面的任务管理和通知系统。查看示例
通过探索这些示例,您可以深入了解如何利用 CrewAI Flows 来处理各种用例,从自动化重复性任务到管理具有动态决策和人工反馈的复杂多步骤流程。 另外,请观看我们在 YouTube 上关于如何在 CrewAI 中使用工作流的视频!

运行工作流

有两种方法可以运行工作流

使用工作流 API

您可以通过创建您的工作流类的实例并调用 `kickoff()` 方法来以编程方式运行工作流
flow = ExampleFlow()
result = flow.kickoff()

使用命令行界面 (CLI)

从 0.103.0 版本开始,您可以使用 `crewai run` 命令运行工作流
crewai run
此命令会自动检测您的项目是否为工作流(基于您 pyproject.toml 中的 `type = "flow"` 设置)并相应地运行它。这是从命令行运行工作流的推荐方法。 为了向后兼容,您也可以使用:
crewai flow kickoff
然而,crewai run 命令现在是首选方法,因为它对团队(crews)和工作流(flows)都有效。