介绍

CrewAI Flows 是一项强大的功能,旨在简化 AI 工作流的创建和管理。Flows 允许开发者有效地结合和协调编码任务与团队(Crews),为构建复杂的 AI 自动化提供了强大的框架。

Flows 允许您创建结构化的、事件驱动的工作流。它们提供了一种无缝的方式来连接多个任务、管理状态并控制 AI 应用程序中的执行流程。使用 Flows,您可以轻松设计和实现多步骤流程,充分利用 CrewAI 的所有能力。

  1. 简化工作流创建:轻松地将多个团队和任务连接起来,创建复杂的 AI 工作流。

  2. 状态管理:Flows 使得管理和共享工作流中不同任务之间的状态变得非常容易。

  3. 事件驱动架构:基于事件驱动模型构建,支持动态且响应迅速的工作流。

  4. 灵活的控制流:在工作流中实现条件逻辑、循环和分支。

入门

让我们创建一个简单的工作流,您将在一个任务中使用 OpenAI 生成一个随机城市,然后在另一个任务中使用该城市生成一个有趣的 फैक्ट。

代码

from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion


class ExampleFlow(Flow):
    model = "gpt-4o-mini"

    @start()
    def generate_city(self):
        print("Starting flow")
        # Each flow state automatically gets a unique ID
        print(f"Flow State ID: {self.state['id']}")

        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": "Return the name of a random city in the world.",
                },
            ],
        )

        random_city = response["choices"][0]["message"]["content"]
        # Store the city in our state
        self.state["city"] = random_city
        print(f"Random City: {random_city}")

        return random_city

    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": f"Tell me a fun fact about {random_city}",
                },
            ],
        )

        fun_fact = response["choices"][0]["message"]["content"]
        # Store the fun fact in our state
        self.state["fun_fact"] = fun_fact
        return fun_fact



flow = ExampleFlow()
flow.plot()
result = flow.kickoff()

print(f"Generated fun fact: {result}")

在上面的示例中,我们创建了一个简单的工作流,它使用 OpenAI 生成一个随机城市,然后生成关于该城市的一个有趣的 факты。该工作流由两个任务组成:generate_citygenerate_fun_factgenerate_city 任务是工作流的起点,而 generate_fun_fact 任务监听 generate_city 任务的输出。

每个工作流实例在其状态中都会自动获得一个唯一的标识符 (UUID),这有助于跟踪和管理工作流执行。状态还可以存储在工作流执行期间持续存在的额外数据(例如生成的城市和有趣的 факты)。

当您运行工作流时,它将

  1. 为工作流状态生成一个唯一 ID
  2. 生成一个随机城市并将其存储在状态中
  3. 生成关于该城市的一个有趣的 факты并将其存储在状态中
  4. 将结果打印到控制台

状态的唯一 ID 和存储的数据对于跟踪工作流执行和维护任务之间的上下文非常有用。

注意:请确保您已设置 .env 文件来存储您的 OPENAI_API_KEY。此密钥对于向 OpenAI API 进行身份验证是必需的。

@start()

@start() 装饰器用于将方法标记为工作流的起点。当工作流启动时,所有使用 @start() 装饰器标记的方法都会并行执行。一个工作流中可以有多个起始方法,它们都会在工作流启动时执行。

@listen()

@listen() 装饰器用于将方法标记为监听工作流中另一个任务的输出。使用 @listen() 装饰器标记的方法将在指定任务发出输出时执行。该方法可以将其监听的任务的输出作为参数访问。

用法

@listen() 装饰器可以通过几种方式使用:

  1. 按名称监听方法:您可以将要监听的方法名称作为字符串传递。当该方法完成后,监听器方法将被触发。

    代码
    @listen("generate_city")
    def generate_fun_fact(self, random_city):
        # Implementation
    
  2. 直接监听方法:您可以直接传递方法本身。当该方法完成后,监听器方法将被触发。

    代码
    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        # Implementation
    

工作流输出

访问和处理工作流的输出对于将您的 AI 工作流集成到更大的应用程序或系统中至关重要。CrewAI Flows 提供了直接的机制来检索最终输出、访问中间结果并管理工作流的整体状态。

检索最终输出

当您运行工作流时,最终输出由最后一个完成的方法决定。kickoff() 方法返回此最终方法的输出。

您可以这样访问最终输出:

from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        return "Output from first_method"

    @listen(first_method)
    def second_method(self, first_output):
        return f"Second method received: {first_output}"


flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()

print("---- Final Output ----")
print(final_output)

在此示例中,second_method 是最后一个完成的方法,因此其输出将是工作流的最终输出。kickoff() 方法将返回最终输出,然后将其打印到控制台。plot() 方法将生成 HTML 文件,这将帮助您理解工作流。

访问和更新状态

除了检索最终输出外,您还可以访问和更新工作流中的状态。状态可用于在工作流中的不同方法之间存储和共享数据。工作流运行后,您可以访问状态以检索执行期间添加或更新的任何信息。

以下是更新和访问状态的示例:

from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)

在此示例中,状态由 first_methodsecond_method 都进行了更新。工作流运行后,您可以访问最终状态以查看这些方法所做的更新。

通过确保返回最终方法的输出并提供对状态的访问,CrewAI Flows 使将 AI 工作流的结果集成到更大的应用程序或系统中变得容易,同时还在整个工作流执行过程中维护和访问状态。

工作流状态管理

有效管理状态对于构建可靠且可维护的 AI 工作流至关重要。CrewAI Flows 为非结构化和结构化状态管理提供了强大的机制,允许开发人员选择最适合其应用程序需求的方法。

非结构化状态管理

在非结构化状态管理中,所有状态都存储在 Flow 类的 state 属性中。这种方法提供了灵活性,使开发人员无需定义严格的模式即可即时添加或修改状态属性。即使使用非结构化状态,CrewAI Flows 也会自动为每个状态实例生成并维护一个唯一的标识符 (UUID)。

代码
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state['counter'] = 0
        self.state['message'] = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()

注意:id 字段是自动生成的,并在整个工作流执行过程中保留。您无需手动管理或设置它,即使使用新数据更新状态,它也会得到维护。

关键点

  • 灵活性:您可以动态地向 self.state 添加属性,而无需预定义的约束。
  • 简单性:适用于状态结构最少或变化很大的简单工作流。

结构化状态管理

结构化状态管理利用预定义的模式来确保整个工作流的一致性和类型安全。通过使用 Pydantic 的 BaseModel 等模型,开发人员可以定义状态的确切形状,从而在开发环境中实现更好的验证和自动完成。

CrewAI Flows 中的每个状态都会自动获得一个唯一的标识符 (UUID),以帮助跟踪和管理状态实例。此 ID 由工作流系统自动生成和管理。

代码
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    # Note: 'id' field is automatically added to all states
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        # Access the auto-generated ID if needed
        print(f"State ID: {self.state.id}")
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()

关键点

  • 定义的模式:ExampleState 清晰地概述了状态结构,增强了代码的可读性和可维护性。
  • 类型安全:利用 Pydantic 确保状态属性符合指定的类型,减少运行时错误。
  • 自动完成:IDE 可以根据定义的状态模型提供更好的自动完成和错误检查。

选择非结构化与结构化状态管理

  • 在以下情况使用非结构化状态管理:

    • 工作流状态简单或高度动态。
    • 灵活性优先于严格的状态定义。
    • 需要快速原型设计,而无需定义模式的开销。
  • 在以下情况使用结构化状态管理:

    • 工作流需要一个明确定义且一致的状态结构。
    • 类型安全和验证对于应用程序的可靠性很重要。
    • 您希望利用 IDE 功能(如自动完成和类型检查)来改善开发人员体验。

通过提供非结构化和结构化状态管理选项,CrewAI Flows 使开发人员能够构建既灵活又强大的 AI 工作流,满足广泛的应用需求。

工作流持久性

@persist 装饰器在 CrewAI Flows 中启用自动状态持久化,允许您在重启或不同的工作流执行中保持工作流状态。此装饰器可以应用于类级别或方法级别,提供管理状态持久性的灵活性。

类级别持久性

当应用于类级别时,@persist 装饰器会自动持久化所有工作流方法状态:

@persist  # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
    @start()
    def initialize_flow(self):
        # This method will automatically have its state persisted
        self.state.counter = 1
        print("Initialized flow. State ID:", self.state.id)

    @listen(initialize_flow)
    def next_step(self):
        # The state (including self.state.id) is automatically reloaded
        self.state.counter += 1
        print("Flow state is persisted. Counter:", self.state.counter)

方法级别持久性

为了更精细的控制,您可以将 @persist 应用于特定方法:

class AnotherFlow(Flow[dict]):
    @persist  # Persists only this method's state
    @start()
    def begin(self):
        if "runs" not in self.state:
            self.state["runs"] = 0
        self.state["runs"] += 1
        print("Method-level persisted runs:", self.state["runs"])

工作原理

  1. 唯一状态标识

    • 每个工作流状态自动获得一个唯一的 UUID
    • ID 在状态更新和方法调用中保留
    • 支持结构化 (Pydantic BaseModel) 和非结构化 (dictionary) 状态
  2. 默认 SQLite 后端

    • SQLiteFlowPersistence 是默认存储后端
    • 状态自动保存到本地 SQLite 数据库
    • 强大的错误处理确保在数据库操作失败时提供清晰的消息
  3. 错误处理

    • 针对数据库操作的全面错误消息
    • 保存和加载期间的自动状态验证
    • 持久化操作遇到问题时提供清晰的反馈

重要注意事项

  • 状态类型:支持结构化 (Pydantic BaseModel) 和非结构化 (dictionary) 状态
  • 自动 ID:如果不存在,将自动添加 id 字段
  • 状态恢复:失败或重启的工作流可以自动重新加载其先前的状态
  • 自定义实现:您可以提供自己的 FlowPersistence 实现以满足专门的存储需求

技术优势

  1. 通过低级访问实现精确控制

    • 直接访问持久化操作,适用于高级用例
    • 通过方法级持久化装饰器实现精细控制
    • 内置状态检查和调试功能
    • 完全可见状态更改和持久化操作
  2. 增强的可靠性

    • 系统故障或重启后的自动状态恢复
    • 基于事务的状态更新,确保数据完整性
    • 全面的错误处理,带有清晰的错误消息
    • 状态保存和加载操作期间的稳健验证
  3. 可扩展架构

    • 通过 FlowPersistence 接口实现可定制的持久化后端
    • 支持 SQLite 以外的专用存储解决方案
    • 兼容结构化 (Pydantic) 和非结构化 (dict) 状态
    • 与现有 CrewAI 工作流模式无缝集成

持久化系统的架构强调技术精度和定制选项,使开发人员能够在利用内置可靠性功能的同时保持对状态管理的完全控制。

工作流控制

条件逻辑:or (或)

Flows 中的 or_ 函数允许您监听多个方法,并在指定的任何方法发出输出时触发监听器方法。

from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):

    @start()
    def start_method(self):
        return "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        return "Hello from the second method"

    @listen(or_(start_method, second_method))
    def logger(self, result):
        print(f"Logger: {result}")



flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()

当您运行此工作流时,logger 方法将由 start_methodsecond_method 的输出触发。or_ 函数用于监听多个方法,并在指定的任何方法发出输出时触发监听器方法。

条件逻辑:and (与)

Flows 中的 and_ 函数允许您监听多个方法,并且仅在所有指定方法都发出输出时触发监听器方法。

from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.plot()
flow.kickoff()

当您运行此工作流时,logger 方法仅在 start_methodsecond_method 都发出输出时才会被触发。and_ 函数用于监听多个方法,并且仅在所有指定方法都发出输出时触发监听器方法。

路由器

Flows 中的 @router() 装饰器允许您根据方法的输出定义条件路由逻辑。您可以根据方法的输出指定不同的路由,从而动态控制执行流程。

import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()

在上面的示例中,start_method 生成一个随机布尔值并将其设置在状态中。second_method 使用 @router() 装饰器根据布尔值定义条件路由逻辑。如果布尔值为 True,方法返回 "success";如果为 False,方法返回 "failed"third_methodfourth_method 监听 second_method 的输出,并根据返回的值执行。

当您运行此工作流时,输出将根据 start_method 生成的随机布尔值而改变。

将代理添加到工作流

代理可以无缝集成到您的工作流中,当您需要更简单、更专注的任务执行时,代理提供了完整的团队的轻量级替代方案。以下是如何在工作流中使用代理执行市场研究的示例:

import asyncio
from typing import Any, Dict, List

from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field

from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start


# Define a structured output format
class MarketAnalysis(BaseModel):
    key_trends: List[str] = Field(description="List of identified market trends")
    market_size: str = Field(description="Estimated market size")
    competitors: List[str] = Field(description="Major competitors in the space")


# Define flow state
class MarketResearchState(BaseModel):
    product: str = ""
    analysis: MarketAnalysis | None = None


# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
    @start()
    def initialize_research(self) -> Dict[str, Any]:
        print(f"Starting market research for {self.state.product}")
        return {"product": self.state.product}

    @listen(initialize_research)
    async def analyze_market(self) -> Dict[str, Any]:
        # Create an Agent for market research
        analyst = Agent(
            role="Market Research Analyst",
            goal=f"Analyze the market for {self.state.product}",
            backstory="You are an experienced market analyst with expertise in "
            "identifying market trends and opportunities.",
            tools=[SerperDevTool()],
            verbose=True,
        )

        # Define the research query
        query = f"""
        Research the market for {self.state.product}. Include:
        1. Key market trends
        2. Market size
        3. Major competitors

        Format your response according to the specified structure.
        """

        # Execute the analysis with structured output format
        result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
        if result.pydantic:
            print("result", result.pydantic)
        else:
            print("result", result)

        # Return the analysis to update the state
        return {"analysis": result.pydantic}

    @listen(analyze_market)
    def present_results(self, analysis) -> None:
        print("\nMarket Analysis Results")
        print("=====================")

        if isinstance(analysis, dict):
            # If we got a dict with 'analysis' key, extract the actual analysis object
            market_analysis = analysis.get("analysis")
        else:
            market_analysis = analysis

        if market_analysis and isinstance(market_analysis, MarketAnalysis):
            print("\nKey Market Trends:")
            for trend in market_analysis.key_trends:
                print(f"- {trend}")

            print(f"\nMarket Size: {market_analysis.market_size}")

            print("\nMajor Competitors:")
            for competitor in market_analysis.competitors:
                print(f"- {competitor}")
        else:
            print("No structured analysis data available.")
            print("Raw analysis:", analysis)


# Usage example
async def run_flow():
    flow = MarketResearchFlow()
    flow.plot("MarketResearchFlowPlot")
    result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
    return result


# Run the flow
if __name__ == "__main__":
    asyncio.run(run_flow())

此示例演示了在工作流中使用代理的几个关键功能:

  1. 结构化输出:使用 Pydantic 模型定义预期的输出格式 (MarketAnalysis),确保整个工作流中的类型安全和结构化数据。

  2. 状态管理:工作流状态 (MarketResearchState) 维护步骤之间的上下文,并存储输入和输出。

  3. 工具集成:代理可以使用工具(如 WebsiteSearchTool)来增强其能力。

将团队添加到工作流

在 CrewAI 中创建包含多个团队的工作流非常简单。

您可以通过运行以下命令生成一个新的 CrewAI 项目,其中包含创建包含多个团队的工作流所需的所有脚手架:

crewai create flow name_of_flow

此命令将生成一个具有必要文件夹结构的新 CrewAI 项目。生成的项目包含一个名为 poem_crew 的预建团队,该团队已可工作。您可以通过复制、粘贴和编辑此团队来创建其他团队,将其用作模板。

文件夹结构

运行 crewai create flow name_of_flow 命令后,您将看到类似于以下的文件夹结构:

目录/文件描述
name_of_flow/工作流的根目录。
├── crews/包含特定团队的目录。
│ └── poem_crew/“poem_crew” 的目录,包含其配置和脚本。
│ ├── config/“poem_crew” 的配置文件目录。
│ │ ├── agents.yaml定义“poem_crew”代理的 YAML 文件。
│ │ └── tasks.yaml定义“poem_crew”任务的 YAML 文件。
│ ├── poem_crew.py“poem_crew” 功能的脚本。
├── tools/工作流中使用的附加工具的目录。
│ └── custom_tool.py自定义工具实现。
├── main.py运行工作流的主脚本。
├── README.md项目描述和说明。
├── pyproject.toml项目依赖项和设置的配置文件。
└── .gitignore指定在版本控制中忽略的文件和目录。

构建您的团队

crews 文件夹中,您可以定义多个团队。每个团队都有自己的文件夹,包含配置文件和团队定义文件。例如,poem_crew 文件夹包含:

  • config/agents.yaml:定义团队的代理。
  • config/tasks.yaml:定义团队的任务。
  • poem_crew.py:包含团队定义,包括代理、任务和团队本身。

您可以复制、粘贴和编辑 poem_crew 来创建其他团队。

main.py 中连接团队

main.py 文件是您创建工作流并将团队连接在一起的地方。您可以使用 Flow 类以及 @start@listen 装饰器来定义工作流,指定执行流程。

以下是如何在 main.py 文件中连接 poem_crew 的示例:

代码
#!/usr/bin/env python
from random import randint

from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew

class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""

class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})

        print("Poem generated", result.raw)
        self.state.poem = result.raw

    @listen(generate_poem)
    def save_poem(self):
        print("Saving poem")
        with open("poem.txt", "w") as f:
            f.write(self.state.poem)

def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot("PoemFlowPlot")

if __name__ == "__main__":
    kickoff()
    plot()

在此示例中,PoemFlow 类定义了一个工作流,该工作流生成句子计数,使用 PoemCrew 生成诗歌,然后将诗歌保存到文件中。通过调用 kickoff() 方法启动工作流。PoemFlowPlot 将由 plot() 方法生成。

运行工作流

(可选)在运行工作流之前,您可以通过运行以下命令安装依赖项:

crewai install

安装所有依赖项后,您需要通过运行以下命令激活虚拟环境:

source .venv/bin/activate

激活虚拟环境后,您可以通过执行以下命令之一来运行工作流:

crewai flow kickoff

uv run kickoff

工作流将执行,您应该在控制台中看到输出。

绘制工作流图

可视化您的 AI 工作流可以提供关于工作流结构和执行路径的宝贵见解。CrewAI 提供了一个强大的可视化工具,允许您生成工作流的交互式图,从而更容易理解和优化您的 AI 工作流。

什么是图?

CrewAI 中的图是您的 AI 工作流的图形表示。它们显示各种任务、它们之间的连接以及数据流。这种可视化有助于理解操作顺序、识别瓶颈,并确保工作流逻辑与您的期望一致。

如何生成图

CrewAI 提供了两种方便的方法来生成工作流的图:

选项 1:使用 plot() 方法

如果您直接使用工作流实例,可以通过在工作流对象上调用 plot() 方法来生成图。此方法将创建一个包含工作流交互式图的 HTML 文件。

代码
# Assuming you have a flow instance
flow.plot("my_flow_plot")

这将在您当前目录中生成一个名为 my_flow_plot.html 的文件。您可以在网页浏览器中打开此文件以查看交互式图。

选项 2:使用命令行

如果您正在结构化的 CrewAI 项目中工作,可以使用命令行生成图。这对于您想要可视化整个工作流设置的大型项目特别有用。

crewai flow plot

此命令将生成一个包含工作流图的 HTML 文件,类似于 plot() 方法。该文件将保存在您的项目目录中,您可以在网页浏览器中打开它来探索工作流。

理解图

生成的图将显示代表工作流中任务的节点,有向边指示执行流程。该图是交互式的,允许您放大和缩小,并悬停在节点上查看更多详细信息。

通过可视化您的工作流,您可以更清晰地了解工作流的结构,使其更容易调试、优化并向他人沟通您的 AI 流程。

结论

绘制工作流图是 CrewAI 的一项强大功能,可以增强您设计和管理复杂 AI 工作流的能力。无论您选择使用 plot() 方法还是命令行,生成图都将为您提供工作流的可视化表示,有助于开发和演示。

下一步

如果您有兴趣探索更多工作流示例,我们的示例仓库中有很多推荐。这里有四个具体的工作流示例,每个都展示了独特的用例,以帮助您将当前问题类型与特定示例进行匹配:

  1. 电子邮件自动回复工作流:此示例演示了一个无限循环,其中一个后台作业持续运行以自动化电子邮件回复。对于需要重复执行且无需手动干预的任务来说,这是一个很好的用例。查看示例

  2. 潜在客户评分工作流:此工作流展示了如何添加人工反馈并通过路由器处理不同的条件分支。这是如何将动态决策和人工监督纳入工作流的绝佳示例。查看示例

  3. 写书工作流:此示例擅长将多个团队连接在一起,其中一个团队的输出被另一个团队使用。具体来说,一个团队概述整本书,另一个团队根据大纲生成章节。最终,所有内容连接起来,生成一本完整的书。此工作流非常适合需要不同任务之间协调的复杂多步骤过程。查看示例

  4. 会议助手工作流:此工作流演示了如何广播一个事件以触发多个后续操作。例如,会议完成后,工作流可以更新 Trello 看板、发送 Slack 消息并保存结果。这是处理单个事件产生的多个结果的绝佳示例,使其成为全面任务管理和通知系统的理想选择。查看示例

通过探索这些示例,您可以深入了解如何将 CrewAI Flows 用于各种用例,从自动化重复任务到管理具有动态决策和人工反馈的复杂多步骤过程。

此外,请查看下面关于如何在 CrewAI 中使用工作流的 YouTube 视频!

运行工作流

有两种运行工作流的方式:

使用工作流 API

您可以通过创建工作流类的实例并调用 kickoff() 方法来以编程方式运行工作流:

flow = ExampleFlow()
result = flow.kickoff()

使用命令行接口 (CLI)

从 0.103.0 版本开始,您可以使用 crewai run 命令运行工作流:

crewai run

此命令会自动检测您的项目是否是工作流(基于 pyproject.toml 中的 type = "flow" 设置),并相应地运行它。这是从命令行运行工作流的推荐方式。

为了向后兼容,您还可以使用:

crewai flow kickoff

然而,crewai run 命令现在是首选方法,因为它适用于团队和工作流。