事件监听器

CrewAI 提供了一个强大的事件系统,允许您监听 Crew 执行期间发生的各种事件并对其做出反应。此功能使您能够构建自定义集成、监控解决方案、日志系统或任何其他需要根据 CrewAI 内部事件触发的功能。

工作原理

CrewAI 使用事件总线架构在整个执行生命周期中发出事件。该事件系统构建在以下组件之上

  1. CrewAIEventsBus:管理事件注册和发出的单例事件总线
  2. BaseEvent:系统中所有事件的基类
  3. BaseEventListener:用于创建自定义事件监听器的抽象基类

当 CrewAI 中发生特定操作(例如 Crew 开始执行、智能体完成任务或工具被使用)时,系统会发出相应的事件。您可以注册这些事件的处理程序,以便在事件发生时执行自定义代码。

CrewAI 企业版提供了一个内置的 Prompt Tracing(提示跟踪)功能,该功能利用事件系统来跟踪、存储和可视化所有提示、完成情况和相关的元数据。这提供了强大的调试功能和对您的智能体操作的透明度。

使用 Prompt Tracing,您可以

  • 查看发送到您的 LLM 的所有提示的完整历史记录
  • 跟踪 token 使用量和成本
  • 调试智能体推理失败
  • 与您的团队共享提示序列
  • 比较不同的提示策略
  • 导出跟踪记录以满足合规性和审计要求

创建自定义事件监听器

要创建自定义事件监听器,您需要

  1. 创建一个继承自 BaseEventListener 的类
  2. 实现 setup_listeners 方法
  3. 注册您感兴趣的事件的处理程序
  4. 在适当的文件中创建您的监听器实例

这是一个自定义事件监听器类的简单示例

from crewai.utilities.events import (
    CrewKickoffStartedEvent,
    CrewKickoffCompletedEvent,
    AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener

class MyCustomListener(BaseEventListener):
    def __init__(self):
        super().__init__()

    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(CrewKickoffStartedEvent)
        def on_crew_started(source, event):
            print(f"Crew '{event.crew_name}' has started execution!")

        @crewai_event_bus.on(CrewKickoffCompletedEvent)
        def on_crew_completed(source, event):
            print(f"Crew '{event.crew_name}' has completed execution!")
            print(f"Output: {event.output}")

        @crewai_event_bus.on(AgentExecutionCompletedEvent)
        def on_agent_execution_completed(source, event):
            print(f"Agent '{event.agent.role}' completed task")
            print(f"Output: {event.output}")

正确注册您的监听器

仅仅定义您的监听器类是不够的。您需要创建它的一个实例并确保它在您的应用程序中被导入。这确保了

  1. 事件处理程序已向事件总线注册
  2. 监听器实例保留在内存中(未被垃圾回收)
  3. 当事件发出时监听器处于活动状态

选项 1:在您的 Crew 或 Flow 实现中导入和实例化

最重要的事情是在定义和执行 Crew 或 Flow 的文件中创建监听器实例

对于基于 Crew 的应用程序

在 Crew 实现文件的顶部创建和导入您的监听器

# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener

# Create an instance of your listener
my_listener = MyCustomListener()

class MyCustomCrew:
    # Your crew implementation...

    def crew(self):
        return Crew(
            agents=[...],
            tasks=[...],
            # ...
        )

对于基于 Flow 的应用程序

在 Flow 实现文件的顶部创建和导入您的监听器

# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener

# Create an instance of your listener
my_listener = MyCustomListener()

class MyCustomFlow(Flow):
    # Your flow implementation...

    @start()
    def first_step(self):
        # ...

这确保了当您的 Crew 或 Flow 执行时,您的监听器已被加载并处于活动状态。

选项 2:为您的监听器创建一个包

对于更结构化的方法,特别是当您有多个监听器时

  1. 为您的监听器创建一个包
my_project/
  ├── listeners/
  │   ├── __init__.py
  │   ├── my_custom_listener.py
  │   └── another_listener.py
  1. my_custom_listener.py 中,定义您的监听器类并创建一个实例
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
# ... import events ...

class MyCustomListener(BaseEventListener):
    # ... implementation ...

# Create an instance of your listener
my_custom_listener = MyCustomListener()
  1. __init__.py 中,导入监听器实例以确保它们被加载
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener

# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
  1. 在您的 Crew 或 Flow 文件中导入您的监听器包
# In your crew.py or flow.py file
import my_project.listeners  # This loads all your listeners

class MyCustomCrew:
    # Your crew implementation...

CrewAI 内置的 agentops_listener 正是通过这种方式注册的。在 CrewAI 代码库中,您会发现

# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener

这确保了当 crewai.utilities.events 包被导入时,agentops_listener 被加载。

可用事件类型

CrewAI 提供了各种可供您监听的事件

Crew 事件

  • CrewKickoffStartedEvent:当 Crew 开始执行时发出
  • CrewKickoffCompletedEvent:当 Crew 完成执行时发出
  • CrewKickoffFailedEvent:当 Crew 未能完成执行时发出
  • CrewTestStartedEvent:当 Crew 开始测试时发出
  • CrewTestCompletedEvent:当 Crew 完成测试时发出
  • CrewTestFailedEvent:当 Crew 未能完成测试时发出
  • CrewTrainStartedEvent:当 Crew 开始训练时发出
  • CrewTrainCompletedEvent:当 Crew 完成训练时发出
  • CrewTrainFailedEvent:当 Crew 未能完成训练时发出

智能体事件

  • AgentExecutionStartedEvent:当智能体开始执行任务时发出
  • AgentExecutionCompletedEvent:当智能体完成执行任务时发出
  • AgentExecutionErrorEvent:当智能体在执行过程中遇到错误时发出

任务事件

  • TaskStartedEvent:当任务开始执行时发出
  • TaskCompletedEvent:当任务完成执行时发出
  • TaskFailedEvent:当任务未能完成执行时发出
  • TaskEvaluationEvent:当任务被评估时发出

工具使用事件

  • ToolUsageStartedEvent:当工具执行开始时发出
  • ToolUsageFinishedEvent:当工具执行完成时发出
  • ToolUsageErrorEvent:当工具执行遇到错误时发出
  • ToolValidateInputErrorEvent:当工具输入验证遇到错误时发出
  • ToolExecutionErrorEvent:当工具执行遇到错误时发出
  • ToolSelectionErrorEvent:当选择工具时发生错误时发出

Flow 事件

  • FlowCreatedEvent:当 Flow 被创建时发出
  • FlowStartedEvent:当 Flow 开始执行时发出
  • FlowFinishedEvent:当 Flow 完成执行时发出
  • FlowPlotEvent:当 Flow 被绘制时发出
  • MethodExecutionStartedEvent:当 Flow 方法开始执行时发出
  • MethodExecutionFinishedEvent:当 Flow 方法完成执行时发出
  • MethodExecutionFailedEvent:当 Flow 方法未能完成执行时发出

LLM 事件

  • LLMCallStartedEvent:当 LLM 调用开始时发出
  • LLMCallCompletedEvent:当 LLM 调用完成时发出
  • LLMCallFailedEvent:当 LLM 调用失败时发出
  • LLMStreamChunkEvent:在流式传输 LLM 响应期间,为接收到的每个数据块发出

事件处理程序结构

每个事件处理程序接收两个参数

  1. source:发出事件的对象
  2. event:事件实例,包含事件特定数据

事件对象的结构取决于事件类型,但所有事件都继承自 BaseEvent 并包含

  • timestamp:事件发出的时间
  • type:事件类型的字符串标识符

附加字段因事件类型而异。例如,CrewKickoffCompletedEvent 包含 crew_nameoutput 字段。

实际示例:与 AgentOps 的集成

CrewAI 包含一个与 AgentOps 集成的第三方示例,AgentOps 是一个用于 AI 智能体的监控和可观测性平台。其实现方式如下

from typing import Optional

from crewai.utilities.events import (
    CrewKickoffCompletedEvent,
    ToolUsageErrorEvent,
    ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent

try:
    import agentops
    AGENTOPS_INSTALLED = True
except ImportError:
    AGENTOPS_INSTALLED = False

class AgentOpsListener(BaseEventListener):
    tool_event: Optional["agentops.ToolEvent"] = None
    session: Optional["agentops.Session"] = None

    def __init__(self):
        super().__init__()

    def setup_listeners(self, crewai_event_bus):
        if not AGENTOPS_INSTALLED:
            return

        @crewai_event_bus.on(CrewKickoffStartedEvent)
        def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
            self.session = agentops.init()
            for agent in source.agents:
                if self.session:
                    self.session.create_agent(
                        name=agent.role,
                        agent_id=str(agent.id),
                    )

        @crewai_event_bus.on(CrewKickoffCompletedEvent)
        def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
            if self.session:
                self.session.end_session(
                    end_state="Success",
                    end_state_reason="Finished Execution",
                )

        @crewai_event_bus.on(ToolUsageStartedEvent)
        def on_tool_usage_started(source, event: ToolUsageStartedEvent):
            self.tool_event = agentops.ToolEvent(name=event.tool_name)
            if self.session:
                self.session.record(self.tool_event)

        @crewai_event_bus.on(ToolUsageErrorEvent)
        def on_tool_usage_error(source, event: ToolUsageErrorEvent):
            agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)

当 Crew 启动时,此监听器初始化一个 AgentOps 会话,向 AgentOps 注册智能体,跟踪工具使用情况,并在 Crew 完成时结束会话。

AgentOps 监听器通过在 src/crewai/utilities/events/third_party/__init__.py 中的导入注册到 CrewAI 的事件系统中

from .agentops_listener import agentops_listener

这确保了当 crewai.utilities.events 包被导入时,agentops_listener 被加载。

高级用法:限定范围的处理程序

对于临时事件处理(在测试或特定操作中很有用),您可以使用 scoped_handlers 上下文管理器

from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent

with crewai_event_bus.scoped_handlers():
    @crewai_event_bus.on(CrewKickoffStartedEvent)
    def temp_handler(source, event):
        print("This handler only exists within this context")

    # Do something that emits events

# Outside the context, the temporary handler is removed

用例

事件监听器可用于多种目的

  1. 日志记录和监控:跟踪 Crew 的执行并记录重要事件
  2. 分析:收集有关 Crew 性能和行为的数据
  3. 调试:设置临时监听器以调试特定问题
  4. 集成:将 CrewAI 与外部系统(如监控平台、数据库或通知服务)连接
  5. 自定义行为:根据特定事件触发自定义操作

最佳实践

  1. 保持处理程序轻量级:事件处理程序应轻量级,避免阻塞操作
  2. 错误处理:在事件处理程序中包含适当的错误处理,以防止异常影响主执行
  3. 清理:如果您的监听器分配了资源,请确保它们得到妥善清理
  4. 选择性监听:只监听您实际需要处理的事件
  5. 测试:独立测试您的事件监听器,以确保它们按预期运行

通过利用 CrewAI 的事件系统,您可以扩展其功能并将其无缝集成到您现有的基础设施中。