跳转到主要内容

简介

CrewAI 提供了异步启动 Crew 的能力,允许您以非阻塞方式启动 Crew 执行。当您想要并发运行多个 Crew 或在 Crew 执行期间执行其他任务时,此功能特别有用。

异步 Crew 执行

要异步启动 Crew,请使用 kickoff_async() 方法。此方法在单独的线程中启动 Crew 执行,允许主线程继续执行其他任务。

方法签名

代码
def kickoff_async(self, inputs: dict) -> CrewOutput:

参数

  • inputs (dict):一个字典,包含任务所需的输入数据。

返回值

  • CrewOutput:一个对象,表示 Crew 执行的结果。

潜在用例

  • 并行内容生成:异步启动多个独立的 Crew,每个 Crew 负责生成不同主题的内容。例如,一个 Crew 可能研究并撰写关于人工智能趋势的文章,而另一个 Crew 生成关于新产品发布的社交媒体帖子。每个 Crew 独立运行,从而有效扩展内容生产。
  • 并发市场调研任务:异步启动多个 Crew 以并行进行市场调研。一个 Crew 可能分析行业趋势,另一个检查竞争对手策略,还有一个评估消费者情绪。每个 Crew 独立完成其任务,从而实现更快、更全面的洞察。
  • 独立旅行规划模块:执行独立的 Crew 以独立规划旅行的不同方面。一个 Crew 可能处理航班选项,另一个处理住宿,第三个规划活动。每个 Crew 异步工作,从而可以同时独立规划旅行的各个组成部分,以加快结果速度。

示例:单次异步 Crew 执行

这是一个如何使用 asyncio 异步启动 Crew 并等待结果的示例
代码
import asyncio
from crewai import Crew, Agent, Task

# Create an agent with code execution enabled
coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

# Create a task that requires code execution
data_analysis_task = Task(
    description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

# Create a crew and add the task
analysis_crew = Crew(
    agents=[coding_agent],
    tasks=[data_analysis_task]
)

# Async function to kickoff the crew asynchronously
async def async_crew_execution():
    result = await analysis_crew.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    print("Crew Result:", result)

# Run the async function
asyncio.run(async_crew_execution())

示例:多次异步 Crew 执行

在此示例中,我们将展示如何异步启动多个 Crew,并使用 asyncio.gather() 等待所有 Crew 完成
代码
import asyncio
from crewai import Crew, Agent, Task

# Create an agent with code execution enabled
coding_agent = Agent(
    role="Python Data Analyst",
    goal="Analyze data and provide insights using Python",
    backstory="You are an experienced data analyst with strong Python skills.",
    allow_code_execution=True
)

# Create tasks that require code execution
task_1 = Task(
    description="Analyze the first dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

task_2 = Task(
    description="Analyze the second dataset and calculate the average age of participants. Ages: {ages}",
    agent=coding_agent,
    expected_output="The average age of the participants."
)

# Create two crews and add tasks
crew_1 = Crew(agents=[coding_agent], tasks=[task_1])
crew_2 = Crew(agents=[coding_agent], tasks=[task_2])

# Async function to kickoff multiple crews asynchronously and wait for all to finish
async def async_multiple_crews():
    # Create coroutines for concurrent execution
    result_1 = crew_1.kickoff_async(inputs={"ages": [25, 30, 35, 40, 45]})
    result_2 = crew_2.kickoff_async(inputs={"ages": [20, 22, 24, 28, 30]})

    # Wait for both crews to finish
    results = await asyncio.gather(result_1, result_2)

    for i, result in enumerate(results, 1):
        print(f"Crew {i} Result:", result)

# Run the async function
asyncio.run(async_multiple_crews())