跳转到主要内容

概述

CrewAI 提供了一个强大的事件系统,让您可以监听并响应在您的团队(Crew)执行过程中发生的各种事件。此功能使您能够构建自定义集成、监控解决方案、日志系统或任何其他需要根据 CrewAI 内部事件触发的功能。

工作原理

CrewAI 使用事件总线架构在整个执行生命周期中发出事件。该事件系统建立在以下组件之上:
  1. CrewAIEventsBus:一个管理事件注册和发出的单例事件总线
  2. BaseEvent:系统中所有事件的基类
  3. BaseEventListener:用于创建自定义事件监听器的抽象基类
当 CrewAI 中发生特定操作时(例如,一个 Crew 开始执行、一个 Agent 完成任务或一个工具被使用),系统会发出相应的事件。您可以为这些事件注册处理程序,以便在它们发生时执行自定义代码。
CrewAI AMP 提供了一个内置的提示词追踪(Prompt Tracing)功能,该功能利用事件系统来跟踪、存储和可视化所有提示词、完成结果以及相关的元数据。这为您的智能体操作提供了强大的调试能力和透明度。提示词追踪仪表盘通过提示词追踪,您可以:
  • 查看发送到您 LLM 的所有提示词的完整历史记录
  • 跟踪 token 使用量和成本
  • 调试智能体推理失败
  • 与您的团队共享提示词序列
  • 比较不同的提示词策略
  • 导出追踪记录以进行合规和审计

创建自定义事件监听器

要创建自定义事件监听器,您需要:
  1. 创建一个继承自 BaseEventListener 的类
  2. 实现 setup_listeners 方法
  3. 为您感兴趣的事件注册处理程序
  4. 在适当的文件中创建您的监听器实例
下面是一个自定义事件监听器类的简单示例:
from crewai.events import (
    CrewKickoffStartedEvent,
    CrewKickoffCompletedEvent,
    AgentExecutionCompletedEvent,
)
from crewai.events import BaseEventListener

class MyCustomListener(BaseEventListener):
    def __init__(self):
        super().__init__()

    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(CrewKickoffStartedEvent)
        def on_crew_started(source, event):
            print(f"Crew '{event.crew_name}' has started execution!")

        @crewai_event_bus.on(CrewKickoffCompletedEvent)
        def on_crew_completed(source, event):
            print(f"Crew '{event.crew_name}' has completed execution!")
            print(f"Output: {event.output}")

        @crewai_event_bus.on(AgentExecutionCompletedEvent)
        def on_agent_execution_completed(source, event):
            print(f"Agent '{event.agent.role}' completed task")
            print(f"Output: {event.output}")

正确注册您的监听器

仅仅定义您的监听器类是不够的。您需要创建它的一个实例,并确保它在您的应用程序中被导入。这能确保:
  1. 事件处理程序已在事件总线上注册
  2. 监听器实例保留在内存中(不被垃圾回收)
  3. 当事件被发出时,监听器处于活动状态

选项 1:在您的 Crew 或 Flow 实现中导入和实例化

最重要的事情是在定义和执行您的 Crew 或 Flow 的文件中创建监听器的实例

对于基于 Crew 的应用程序

在您的 Crew 实现文件的顶部创建并导入您的监听器
# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener

# Create an instance of your listener
my_listener = MyCustomListener()

class MyCustomCrew:
    # Your crew implementation...

    def crew(self):
        return Crew(
            agents=[...],
            tasks=[...],
            # ...
        )

对于基于 Flow 的应用程序

在您的 Flow 实现文件的顶部创建并导入您的监听器
# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener

# Create an instance of your listener
my_listener = MyCustomListener()

class MyCustomFlow(Flow):
    # Your flow implementation...

    @start()
    def first_step(self):
        # ...
这可以确保在执行您的 Crew 或 Flow 时,您的监听器已加载并处于活动状态。

选项 2:为您的监听器创建一个包

对于更结构化的方法,特别是当您有多个监听器时:
  1. 为您的监听器创建一个包
my_project/
  ├── listeners/
  │   ├── __init__.py
  │   ├── my_custom_listener.py
  │   └── another_listener.py
  1. my_custom_listener.py 中,定义您的监听器类并创建一个实例
# my_custom_listener.py
from crewai.events import BaseEventListener
# ... import events ...

class MyCustomListener(BaseEventListener):
    # ... implementation ...

# Create an instance of your listener
my_custom_listener = MyCustomListener()
  1. __init__.py 中,导入监听器实例以确保它们被加载
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener

# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
  1. 在您的 Crew 或 Flow 文件中导入您的监听器包
# In your crew.py or flow.py file
import my_project.listeners  # This loads all your listeners

class MyCustomCrew:
    # Your crew implementation...
这就是第三方事件监听器在 CrewAI 代码库中注册的方式。

可用的事件类型

CrewAI 提供了广泛的事件供您监听:

Crew 事件

  • CrewKickoffStartedEvent:当一个 Crew 开始执行时发出
  • CrewKickoffCompletedEvent:当一个 Crew 完成执行时发出
  • CrewKickoffFailedEvent:当一个 Crew 未能完成执行时发出
  • CrewTestStartedEvent:当一个 Crew 开始测试时发出
  • CrewTestCompletedEvent:当一个 Crew 完成测试时发出
  • CrewTestFailedEvent:当一个 Crew 未能完成测试时发出
  • CrewTrainStartedEvent:当一个 Crew 开始训练时发出
  • CrewTrainCompletedEvent:当一个 Crew 完成训练时发出
  • CrewTrainFailedEvent:当一个 Crew 未能完成训练时发出

Agent 事件

  • AgentExecutionStartedEvent:当一个 Agent 开始执行任务时发出
  • AgentExecutionCompletedEvent:当一个 Agent 完成执行任务时发出
  • AgentExecutionErrorEvent:当一个 Agent 在执行过程中遇到错误时发出

Task 事件

  • TaskStartedEvent:当一个任务开始执行时发出
  • TaskCompletedEvent:当一个任务完成执行时发出
  • TaskFailedEvent:当一个任务未能完成执行时发出
  • TaskEvaluationEvent:当一个任务被评估时发出

Tool 使用事件

  • ToolUsageStartedEvent:当一个工具开始执行时发出
  • ToolUsageFinishedEvent:当一个工具执行完成时发出
  • ToolUsageErrorEvent:当一个工具执行遇到错误时发出
  • ToolValidateInputErrorEvent:当一个工具输入验证遇到错误时发出
  • ToolExecutionErrorEvent:当一个工具执行遇到错误时发出
  • ToolSelectionErrorEvent:当选择工具时发生错误时发出

知识事件

  • KnowledgeRetrievalStartedEvent:当知识检索开始时发出
  • KnowledgeRetrievalCompletedEvent:当知识检索完成时发出
  • KnowledgeQueryStartedEvent:当知识查询开始时发出
  • KnowledgeQueryCompletedEvent:当知识查询完成时发出
  • KnowledgeQueryFailedEvent:当知识查询失败时发出
  • KnowledgeSearchQueryFailedEvent:当知识搜索查询失败时发出

LLM 防护栏事件

  • LLMGuardrailStartedEvent:当防护栏验证开始时发出。包含有关正在应用的防护栏和重试次数的详细信息。
  • LLMGuardrailCompletedEvent:当防护栏验证完成时发出。包含有关验证成功/失败、结果以及任何错误消息的详细信息。

Flow 事件

  • FlowCreatedEvent:当一个 Flow 被创建时发出
  • FlowStartedEvent:当一个 Flow 开始执行时发出
  • FlowFinishedEvent:当一个 Flow 完成执行时发出
  • FlowPlotEvent:当一个 Flow 被绘制时发出
  • MethodExecutionStartedEvent:当一个 Flow 方法开始执行时发出
  • MethodExecutionFinishedEvent:当一个 Flow 方法完成执行时发出
  • MethodExecutionFailedEvent:当一个 Flow 方法未能完成执行时发出

LLM 事件

  • LLMCallStartedEvent:当 LLM 调用开始时发出
  • LLMCallCompletedEvent:当 LLM 调用完成时发出
  • LLMCallFailedEvent:当 LLM 调用失败时发出
  • LLMStreamChunkEvent:在流式 LLM 响应期间,每接收到一个数据块时发出

Memory 事件

  • MemoryQueryStartedEvent:当内存查询开始时发出。包含查询、限制和可选的分数阈值。
  • MemoryQueryCompletedEvent:当内存查询成功完成时发出。包含查询、结果、限制、分数阈值和查询执行时间。
  • MemoryQueryFailedEvent:当内存查询失败时发出。包含查询、限制、分数阈值和错误消息。
  • MemorySaveStartedEvent:当内存保存操作开始时发出。包含要保存的值、元数据和可选的智能体角色。
  • MemorySaveCompletedEvent:当内存保存操作成功完成时发出。包含已保存的值、元数据、智能体角色和保存执行时间。
  • MemorySaveFailedEvent:当内存保存操作失败时发出。包含值、元数据、智能体角色和错误消息。
  • MemoryRetrievalStartedEvent:当任务提示的内存检索开始时发出。包含可选的任务 ID。
  • MemoryRetrievalCompletedEvent:当任务提示的内存检索成功完成时发出。包含任务 ID、内存内容和检索执行时间。

事件处理程序结构

每个事件处理程序接收两个参数:
  1. source:发出事件的对象
  2. event:事件实例,包含事件特定的数据
事件对象的结构取决于事件类型,但所有事件都继承自 BaseEvent 并包括:
  • timestamp:事件发出的时间
  • type:事件类型的字符串标识符
附加字段因事件类型而异。例如,CrewKickoffCompletedEvent 包括 crew_nameoutput 字段。

高级用法:作用域处理程序

对于临时事件处理(对于测试或特定操作很有用),您可以使用 scoped_handlers 上下文管理器:
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent

with crewai_event_bus.scoped_handlers():
    @crewai_event_bus.on(CrewKickoffStartedEvent)
    def temp_handler(source, event):
        print("This handler only exists within this context")

    # Do something that emits events

# Outside the context, the temporary handler is removed

用例

事件监听器可用于多种目的:
  1. 日志记录和监控:跟踪您的 Crew 的执行并记录重要事件
  2. 分析:收集关于您的 Crew 性能和行为的数据
  3. 调试:设置临时监听器以调试特定问题
  4. 集成:将 CrewAI 与外部系统连接,如监控平台、数据库或通知服务
  5. 自定义行为:根据特定事件触发自定义操作

最佳实践

  1. 保持处理程序轻量:事件处理程序应保持轻量,避免阻塞操作
  2. 错误处理:在事件处理程序中包含适当的错误处理,以防止异常影响主执行流程
  3. 清理:如果您的监听器分配了资源,请确保它们被正确清理
  4. 选择性监听:只监听您实际需要处理的事件
  5. 测试:独立测试您的事件监听器,以确保它们按预期工作
通过利用 CrewAI 的事件系统,您可以扩展其功能并将其无缝地与您现有的基础设施集成。