指南
- 策略
- 智能体
- 团队
- 流程
- 高级
工具
- AI Mind 工具
- Apify Actors
- Bedrock Invoke Agent 工具
- Bedrock 知识库检索器
- Brave 搜索
- Browserbase 网页加载器
- 代码文档 RAG 搜索
- 代码解释器
- Composio 工具
- CSV RAG 搜索
- DALL-E 工具
- 目录 RAG 搜索
- 目录读取
- DOCX RAG 搜索
- EXA 搜索网页加载器
- 文件读取
- 文件写入
- Firecrawl 爬取网站
- Firecrawl 抓取网站
- Firecrawl 搜索
- Github 搜索
- Hyperbrowser 加载工具
- Linkup 搜索工具
- LlamaIndex 工具
- LangChain 工具
- Google Serper 搜索
- S3 读取工具
- S3 写入工具
- Scrapegraph 抓取工具
- 从网站抓取元素工具
- JSON RAG 搜索
- MDX RAG 搜索
- MySQL RAG 搜索
- MultiOn 工具
- NL2SQL 工具
- Patronus 评估工具
- PDF RAG 搜索
- PG RAG 搜索
- Qdrant 向量搜索工具
- RAG 工具
- 抓取网站
- Scrapfly 抓取网站工具
- Selenium 抓取器
- Snowflake 搜索工具
- Spider 抓取器
- Stagehand 工具
- TXT RAG 搜索
- 视觉工具
- Weaviate 向量搜索
- 网站 RAG 搜索
- XML RAG 搜索
- YouTube 频道 RAG 搜索
- YouTube 视频 RAG 搜索
智能体监控与可观测性
学习
遥测
学习
条件任务
了解如何在 CrewAI 启动中使用条件任务
介绍
CrewAI 中的条件任务允许基于先前任务的结果进行动态工作流调整。这项强大的功能使得团队能够根据需要做出决策并选择性地执行任务,从而增强了 AI 驱动流程的灵活性和效率。
示例用法
代码
from typing import List
from pydantic import BaseModel
from crewai import Agent, Crew
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.task import Task
from crewai_tools import SerperDevTool
# Define a condition function for the conditional task
# If false, the task will be skipped, if true, then execute the task.
def is_data_missing(output: TaskOutput) -> bool:
return len(output.pydantic.events) < 10 # this will skip this task
# Define the agents
data_fetcher_agent = Agent(
role="Data Fetcher",
goal="Fetch data online using Serper tool",
backstory="Backstory 1",
verbose=True,
tools=[SerperDevTool()]
)
data_processor_agent = Agent(
role="Data Processor",
goal="Process fetched data",
backstory="Backstory 2",
verbose=True
)
summary_generator_agent = Agent(
role="Summary Generator",
goal="Generate summary from fetched data",
backstory="Backstory 3",
verbose=True
)
class EventOutput(BaseModel):
events: List[str]
task1 = Task(
description="Fetch data about events in San Francisco using Serper tool",
expected_output="List of 10 things to do in SF this week",
agent=data_fetcher_agent,
output_pydantic=EventOutput,
)
conditional_task = ConditionalTask(
description="""
Check if data is missing. If we have less than 10 events,
fetch more events using Serper tool so that
we have a total of 10 events in SF this week..
""",
expected_output="List of 10 Things to do in SF this week",
condition=is_data_missing,
agent=data_processor_agent,
)
task3 = Task(
description="Generate summary of events in San Francisco from fetched data",
expected_output="A complete report on the customer and their customers and competitors, including their demographics, preferences, market positioning and audience engagement.",
agent=summary_generator_agent,
)
# Create a crew with the tasks
crew = Crew(
agents=[data_fetcher_agent, data_processor_agent, summary_generator_agent],
tasks=[task1, conditional_task, task3],
verbose=True,
planning=True
)
# Run the crew
result = crew.kickoff()
print("results", result)