跳转到主要内容

SnowflakeSearchTool

描述

SnowflakeSearchTool 旨在连接到 Snowflake 数据仓库并执行 SQL 查询,具有连接池、重试逻辑和异步执行等高级功能。此工具允许 CrewAI 智能体与 Snowflake 数据库交互,非常适合需要访问存储在 Snowflake 中的企业数据的数据分析、报告和商业智能任务。

安装

要使用此工具,您需要安装所需的依赖项
uv add cryptography snowflake-connector-python snowflake-sqlalchemy
或者
uv sync --extra snowflake

开始步骤

要有效使用 SnowflakeSearchTool,请遵循以下步骤:
  1. 安装依赖项:使用上述命令之一安装所需的软件包。
  2. 配置 Snowflake 连接:使用您的 Snowflake 凭据创建一个 SnowflakeConfig 对象。
  3. 初始化工具:使用必要的配置创建工具的实例。
  4. 执行查询:使用该工具对您的 Snowflake 数据库运行 SQL 查询。

示例

以下示例演示了如何使用 SnowflakeSearchTool 从 Snowflake 数据库查询数据:
代码
from crewai import Agent, Task, Crew
from crewai_tools import SnowflakeSearchTool, SnowflakeConfig

# Create Snowflake configuration
config = SnowflakeConfig(
    account="your_account",
    user="your_username",
    password="your_password",
    warehouse="COMPUTE_WH",
    database="your_database",
    snowflake_schema="your_schema"
)

# Initialize the tool
snowflake_tool = SnowflakeSearchTool(config=config)

# Define an agent that uses the tool
data_analyst_agent = Agent(
    role="Data Analyst",
    goal="Analyze data from Snowflake database",
    backstory="An expert data analyst who can extract insights from enterprise data.",
    tools=[snowflake_tool],
    verbose=True,
)

# Example task to query sales data
query_task = Task(
    description="Query the sales data for the last quarter and summarize the top 5 products by revenue.",
    expected_output="A summary of the top 5 products by revenue for the last quarter.",
    agent=data_analyst_agent,
)

# Create and run the crew
crew = Crew(agents=[data_analyst_agent], 
            tasks=[query_task])
result = crew.kickoff()
您还可以使用其他参数自定义该工具:
代码
# Initialize the tool with custom parameters
snowflake_tool = SnowflakeSearchTool(
    config=config,
    pool_size=10,
    max_retries=5,
    retry_delay=2.0,
    enable_caching=True
)

参数

SnowflakeConfig 参数

SnowflakeConfig 类接受以下参数:
  • account: 必需。Snowflake 账户标识符。
  • user: 必需。Snowflake 用户名。
  • password: 可选*。Snowflake 密码。
  • private_key_path: 可选*。私钥文件路径(密码的替代方案)。
  • warehouse: 必需。Snowflake 仓库名称。
  • database: 必需。默认数据库。
  • snowflake_schema: 必需。默认 schema。
  • role: 可选。Snowflake 角色。
  • session_parameters: 可选。以字典形式表示的自定义会话参数。
*必须提供 passwordprivate_key_path 中的一个。

SnowflakeSearchTool 参数

SnowflakeSearchTool 在初始化时接受以下参数:
  • config: 必需。一个包含连接详细信息的 SnowflakeConfig 对象。
  • pool_size: 可选。连接池中的连接数。默认为 5。
  • max_retries: 可选。失败查询的最大重试次数。默认为 3。
  • retry_delay: 可选。重试之间的延迟(秒)。默认为 1.0。
  • enable_caching: 可选。是否启用查询结果缓存。默认为 True。

用法

使用 SnowflakeSearchTool 时,您需要提供以下参数:
  • query: 必需。要执行的 SQL 查询。
  • database: 可选。覆盖配置中指定的默认数据库。
  • snowflake_schema: 可选。覆盖配置中指定的默认 schema。
  • timeout: 可选。查询超时时间(秒)。默认为 300。
该工具将以字典列表的形式返回查询结果,其中每个字典代表一行,列名为键。
代码
# Example of using the tool with an agent
data_analyst = Agent(
    role="Data Analyst",
    goal="Analyze sales data from Snowflake",
    backstory="An expert data analyst with experience in SQL and data visualization.",
    tools=[snowflake_tool],
    verbose=True
)

# The agent will use the tool with parameters like:
# query="SELECT product_name, SUM(revenue) as total_revenue FROM sales GROUP BY product_name ORDER BY total_revenue DESC LIMIT 5"
# timeout=600

# Create a task for the agent
analysis_task = Task(
    description="Query the sales database and identify the top 5 products by revenue for the last quarter.",
    expected_output="A detailed analysis of the top 5 products by revenue.",
    agent=data_analyst
)

# Run the task
crew = Crew(
    agents=[data_analyst], 
    tasks=[analysis_task]
)
result = crew.kickoff()

高级功能

连接池

SnowflakeSearchTool 实现了连接池,通过重用数据库连接来提高性能。您可以使用 pool_size 参数控制连接池的大小。

自动重试

该工具会自动使用指数退避策略重试失败的查询。您可以使用 max_retriesretry_delay 参数配置重试行为。

查询结果缓存

为了提高重复查询的性能,该工具可以缓存查询结果。此功能默认启用,但可以通过设置 enable_caching=False 禁用。

密钥对认证

除了密码认证,该工具还支持密钥对认证以增强安全性:
代码
config = SnowflakeConfig(
    account="your_account",
    user="your_username",
    private_key_path="/path/to/your/private/key.p8",
    warehouse="COMPUTE_WH",
    database="your_database",
    snowflake_schema="your_schema"
)

错误处理

SnowflakeSearchTool 包含针对常见 Snowflake 问题的全面错误处理:
  • 连接失败
  • 查询超时
  • 认证错误
  • 数据库和 schema 错误
发生错误时,该工具将尝试重试操作(如果已配置)并提供详细的错误信息。

结论

SnowflakeSearchTool 提供了一种将 Snowflake 数据仓库与 CrewAI 智能体集成的强大方式。凭借连接池、自动重试和查询缓存等功能,它能够高效、可靠地访问企业数据。该工具特别适用于需要访问存储在 Snowflake 中的结构化数据的数据分析、报告和商业智能任务。