介绍

CrewAI 提供了异步启动 Crew 的能力,允许你以非阻塞方式开始 Crew 的执行。当你想要并发运行多个 Crew 或需要在 Crew 执行时执行其他任务时,此功能特别有用。

异步 Crew 执行

要异步启动 Crew,请使用 kickoff_async() 方法。此方法在单独的线程中启动 Crew 执行,允许主线程继续执行其他任务。

方法签名

代码
def kickoff_async(self, inputs: dict) -> CrewOutput:

参数

  • inputs (dict): 包含任务所需输入数据的字典。

返回值

  • CrewOutput: 表示 Crew 执行结果的对象。

潜在用例

  • 并行内容生成:异步启动多个独立的 Crew,每个 Crew 负责生成不同主题的内容。例如,一个 Crew 可能研究并起草关于 AI 趋势的文章,而另一个 Crew 生成关于新产品发布的社交媒体帖子。每个 Crew 独立运作,从而高效地扩展内容生产。

  • 并发市场研究任务:异步启动多个 Crew 以并行进行市场研究。一个 Crew 可能分析行业趋势,另一个检查竞争对手策略,还有一个评估消费者情绪。每个 Crew 独立完成其任务,从而实现更快、更全面的洞察。

  • 独立旅行规划模块:执行独立的 Crew 以独立规划旅行的不同方面。一个 Crew 可能处理航班选项,另一个处理住宿,第三个规划活动。每个 Crew 异步工作,允许同时独立规划旅行的各个组成部分,以加快结果。

示例:单个异步 Crew 执行

以下是使用 asyncio 异步启动 Crew 并等待结果的示例

代码
import asyncio
from crewai import Crew, Agent, Task

# Create an agent with code execution enabled
coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

# Create a task that requires code execution
data_analysis_task = Task(
    description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

# Create a crew and add the task
analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

# Async function to kickoff the crew asynchronously
async def async_crew_execution():
    result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    print("Crew Result:", result)

# Run the async function
asyncio.run(async_crew_execution())

示例:多个异步 Crew 执行

在此示例中,我们将展示如何异步启动多个 Crew 并使用 asyncio.gather() 等待所有 Crew 完成

代码
import asyncio
from crewai import Crew, Agent, Task

# Create an agent with code execution enabled
coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

# Create tasks that require code execution
task_1 = Task(
    description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

task_2 = Task(
    description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

# Create two crews and add tasks
crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])

# Async function to kickoff multiple crews asynchronously and wait for all to finish
async def async_multiple_crews():
    result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]})

    # Wait for both crews to finish
    results = await asyncio.gather(result_1, result_2)

    for i, result in enumerate(results, 1):
        print(f"Crew {i} Result:", result)

# Run the async function
asyncio.run(async_multiple_crews())