跳转到主要内容

概述

CrewAI 提供了一个强大的事件系统,允许您监听并响应在您的 Crew 执行过程中发生的各种事件。此功能使您能够构建自定义集成、监控解决方案、日志系统或任何其他需要根据 CrewAI 内部事件触发的功能。

工作原理

CrewAI 使用事件总线架构在整个执行生命周期中发出事件。事件系统建立在以下组件之上:
  1. CrewAIEventsBus:一个管理事件注册和发出的单例事件总线。
  2. BaseEvent:系统中所有事件的基类。
  3. BaseEventListener:用于创建自定义事件监听器的抽象基类。
当 CrewAI 中发生特定操作(例如 Crew 开始执行、Agent 完成任务或工具被使用)时,系统会发出相应的事件。您可以为这些事件注册处理程序,以便在它们发生时执行自定义代码。
CrewAI AOP 提供了一个内置的提示跟踪功能,该功能利用事件系统来跟踪、存储和可视化所有提示、完成以及相关的元数据。这为您的 Agent 操作提供了强大的调试功能和透明度。提示跟踪仪表板通过提示跟踪,您可以:
  • 查看发送到您的 LLM 的所有提示的完整历史记录
  • 跟踪令牌使用和成本
  • 调试 Agent 推理失败
  • 与您的团队共享提示序列
  • 比较不同的提示策略
  • 导出跟踪以进行合规性和审计

创建自定义事件监听器

要创建自定义事件监听器,您需要:
  1. 创建一个继承自 BaseEventListener 的类
  2. 实现 setup_listeners 方法
  3. 为感兴趣的事件注册处理程序
  4. 在适当的文件中创建您的监听器实例
这是一个自定义事件监听器类的简单示例:
from crewai.events import (
    CrewKickoffStartedEvent,
    CrewKickoffCompletedEvent,
    AgentExecutionCompletedEvent,
)
from crewai.events import BaseEventListener

class MyCustomListener(BaseEventListener):
    def __init__(self):
        super().__init__()

    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(CrewKickoffStartedEvent)
        def on_crew_started(source, event):
            print(f"Crew '{event.crew_name}' has started execution!")

        @crewai_event_bus.on(CrewKickoffCompletedEvent)
        def on_crew_completed(source, event):
            print(f"Crew '{event.crew_name}' has completed execution!")
            print(f"Output: {event.output}")

        @crewai_event_bus.on(AgentExecutionCompletedEvent)
        def on_agent_execution_completed(source, event):
            print(f"Agent '{event.agent.role}' completed task")
            print(f"Output: {event.output}")

正确注册您的监听器

仅仅定义您的监听器类是不够的。您需要创建它的实例并确保它被导入到您的应用程序中。这确保了:
  1. 事件处理程序已向事件总线注册
  2. 监听器实例保留在内存中(未被垃圾回收)
  3. 当事件发出时,监听器处于活动状态

选项 1:在您的 Crew 或 Flow 实现中导入和实例化

最重要的是在定义和执行您的 Crew 或 Flow 的文件中创建您的监听器实例

对于基于 Crew 的应用程序

在您的 Crew 实现文件的顶部创建并导入您的监听器
# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener

# Create an instance of your listener
my_listener = MyCustomListener()

class MyCustomCrew:
    # Your crew implementation...

    def crew(self):
        return Crew(
            agents=[...],
            tasks=[...],
            # ...
        )

对于基于 Flow 的应用程序

在您的 Flow 实现文件的顶部创建并导入您的监听器
# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener

# Create an instance of your listener
my_listener = MyCustomListener()

class MyCustomFlow(Flow):
    # Your flow implementation...

    @start()
    def first_step(self):
        # ...
这确保了当您的 Crew 或 Flow 执行时,您的监听器已加载并处于活动状态。

选项 2:为您的监听器创建包

对于更结构化的方法,特别是如果您有多个监听器:
  1. 为您的监听器创建一个包
my_project/
  ├── listeners/
  │   ├── __init__.py
  │   ├── my_custom_listener.py
  │   └── another_listener.py
  1. my_custom_listener.py 中,定义您的监听器类并创建实例
# my_custom_listener.py
from crewai.events import BaseEventListener
# ... import events ...

class MyCustomListener(BaseEventListener):
    # ... implementation ...

# Create an instance of your listener
my_custom_listener = MyCustomListener()
  1. __init__.py 中,导入监听器实例以确保它们被加载
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener

# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
  1. 在您的 Crew 或 Flow 文件中导入您的监听器包
# In your crew.py or flow.py file
import my_project.listeners  # This loads all your listeners

class MyCustomCrew:
    # Your crew implementation...
这是 CrewAI 代码库中第三方事件监听器的注册方式。

可用事件类型

CrewAI 提供各种事件供您监听:

Crew 事件

  • CrewKickoffStartedEvent:当 Crew 开始执行时发出
  • CrewKickoffCompletedEvent:当 Crew 完成执行时发出
  • CrewKickoffFailedEvent:当 Crew 未能完成执行时发出
  • CrewTestStartedEvent:当 Crew 开始测试时发出
  • CrewTestCompletedEvent:当 Crew 完成测试时发出
  • CrewTestFailedEvent:当 Crew 未能完成测试时发出
  • CrewTrainStartedEvent:当 Crew 开始训练时发出
  • CrewTrainCompletedEvent:当 Crew 完成训练时发出
  • CrewTrainFailedEvent:当 Crew 未能完成训练时发出

Agent 事件

  • AgentExecutionStartedEvent:当 Agent 开始执行任务时发出
  • AgentExecutionCompletedEvent:当 Agent 完成执行任务时发出
  • AgentExecutionErrorEvent:当 Agent 在执行过程中遇到错误时发出

任务事件

  • TaskStartedEvent:当任务开始执行时发出
  • TaskCompletedEvent:当任务完成执行时发出
  • TaskFailedEvent:当任务未能完成执行时发出
  • TaskEvaluationEvent:当任务被评估时发出

工具使用事件

  • ToolUsageStartedEvent:当工具执行开始时发出
  • ToolUsageFinishedEvent:当工具执行完成时发出
  • ToolUsageErrorEvent:当工具执行遇到错误时发出
  • ToolValidateInputErrorEvent:当工具输入验证遇到错误时发出
  • ToolExecutionErrorEvent:当工具执行遇到错误时发出
  • ToolSelectionErrorEvent:当选择工具时出现错误时发出

知识事件

  • KnowledgeRetrievalStartedEvent:当知识检索开始时发出
  • KnowledgeRetrievalCompletedEvent:当知识检索完成时发出
  • KnowledgeQueryStartedEvent:当知识查询开始时发出
  • KnowledgeQueryCompletedEvent:当知识查询完成时发出
  • KnowledgeQueryFailedEvent:当知识查询失败时发出
  • KnowledgeSearchQueryFailedEvent:当知识搜索查询失败时发出

LLM 护栏事件

  • LLMGuardrailStartedEvent:当护栏验证开始时发出。包含有关正在应用的护栏和重试计数的详细信息。
  • LLMGuardrailCompletedEvent:当护栏验证完成时发出。包含有关验证成功/失败、结果以及任何错误消息的详细信息。

流程事件

  • FlowCreatedEvent:当 Flow 被创建时发出
  • FlowStartedEvent:当 Flow 开始执行时发出
  • FlowFinishedEvent:当 Flow 完成执行时发出
  • FlowPlotEvent:当 Flow 被绘制时发出
  • MethodExecutionStartedEvent:当 Flow 方法开始执行时发出
  • MethodExecutionFinishedEvent:当 Flow 方法完成执行时发出
  • MethodExecutionFailedEvent:当 Flow 方法未能完成执行时发出

LLM 事件

  • LLMCallStartedEvent:当 LLM 调用开始时发出
  • LLMCallCompletedEvent:当 LLM 调用完成时发出
  • LLMCallFailedEvent:当 LLM 调用失败时发出
  • LLMStreamChunkEvent:在流式传输 LLM 响应期间收到每个块时发出

内存事件

  • MemoryQueryStartedEvent:当内存查询开始时发出。包含查询、限制和可选的分数阈值。
  • MemoryQueryCompletedEvent:当内存查询成功完成时发出。包含查询、结果、限制、分数阈值和查询执行时间。
  • MemoryQueryFailedEvent:当内存查询失败时发出。包含查询、限制、分数阈值和错误消息。
  • MemorySaveStartedEvent:当内存保存操作开始时发出。包含要保存的值、元数据和可选的 Agent 角色。
  • MemorySaveCompletedEvent:当内存保存操作成功完成时发出。包含保存的值、元数据、Agent 角色和保存执行时间。
  • MemorySaveFailedEvent:当内存保存操作失败时发出。包含值、元数据、Agent 角色和错误消息。
  • MemoryRetrievalStartedEvent:当任务提示的内存检索开始时发出。包含可选的任务 ID。
  • MemoryRetrievalCompletedEvent:当任务提示的内存检索成功完成时发出。包含任务 ID、内存内容和检索执行时间。

事件处理程序结构

每个事件处理程序接收两个参数:
  1. source:发出事件的对象
  2. event:事件实例,包含事件特定的数据
事件对象的结构取决于事件类型,但所有事件都继承自 BaseEvent 并包含:
  • timestamp:事件发出的时间
  • type:事件类型的字符串标识符
附加字段因事件类型而异。例如,CrewKickoffCompletedEvent 包含 crew_nameoutput 字段。

高级用法:范围处理程序

对于临时事件处理(对于测试或特定操作很有用),您可以使用 scoped_handlers 上下文管理器:
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent

with crewai_event_bus.scoped_handlers():
    @crewai_event_bus.on(CrewKickoffStartedEvent)
    def temp_handler(source, event):
        print("This handler only exists within this context")

    # Do something that emits events

# Outside the context, the temporary handler is removed

用例

事件监听器可用于多种目的:
  1. 日志和监控:跟踪 Crew 的执行并记录重要事件
  2. 分析:收集关于 Crew 性能和行为的数据
  3. 调试:设置临时监听器以调试特定问题
  4. 集成:将 CrewAI 与外部系统连接,例如监控平台、数据库或通知服务
  5. 自定义行为:根据特定事件触发自定义操作

最佳实践

  1. 保持处理程序轻量:事件处理程序应轻量化并避免阻塞操作
  2. 错误处理:在事件处理程序中包含适当的错误处理,以防止异常影响主执行
  3. 清理:如果您的监听器分配了资源,请确保它们得到正确清理
  4. 选择性监听:只监听您实际需要处理的事件
  5. 测试:单独测试您的事件监听器,以确保它们按预期运行
通过利用 CrewAI 的事件系统,您可以扩展其功能并将其与您现有的基础设施无缝集成。