SnowflakeSearchTool

描述

SnowflakeSearchTool 旨在连接到 Snowflake 数据仓库,并执行具有连接池、重试逻辑和异步执行等高级功能的 SQL 查询。此工具允许 CrewAI 智能体与 Snowflake 数据库交互,非常适合需要访问存储在 Snowflake 中的企业数据的数据分析、报告和商业智能任务。

安装

要使用此工具,您需要安装所需的依赖项

uv add cryptography snowflake-connector-python snowflake-sqlalchemy

或者

uv sync --extra snowflake

开始步骤

要有效使用 SnowflakeSearchTool,请遵循以下步骤

  1. 安装依赖项:使用上面列出的命令之一安装所需的软件包。
  2. 配置 Snowflake 连接:使用您的 Snowflake 凭据创建一个 SnowflakeConfig 对象。
  3. 初始化工具:使用必要的配置创建工具实例。
  4. 执行查询:使用工具对您的 Snowflake 数据库运行 SQL 查询。

示例

以下示例演示如何使用 SnowflakeSearchTool 从 Snowflake 数据库查询数据

代码
from crewai import Agent, Task, Crew
from crewai_tools import SnowflakeSearchTool, SnowflakeConfig

# Create Snowflake configuration
config = SnowflakeConfig(
    account="your_account",
    user="your_username",
    password="your_password",
    warehouse="COMPUTE_WH",
    database="your_database",
    snowflake_schema="your_schema"
)

# Initialize the tool
snowflake_tool = SnowflakeSearchTool(config=config)

# Define an agent that uses the tool
data_analyst_agent = Agent(
    role="Data Analyst",
    goal="Analyze data from Snowflake database",
    backstory="An expert data analyst who can extract insights from enterprise data.",
    tools=[snowflake_tool],
    verbose=True,
)

# Example task to query sales data
query_task = Task(
    description="Query the sales data for the last quarter and summarize the top 5 products by revenue.",
    expected_output="A summary of the top 5 products by revenue for the last quarter.",
    agent=data_analyst_agent,
)

# Create and run the crew
crew = Crew(agents=[data_analyst_agent], 
            tasks=[query_task])
result = crew.kickoff()

您还可以使用其他参数来自定义工具

代码
# Initialize the tool with custom parameters
snowflake_tool = SnowflakeSearchTool(
    config=config,
    pool_size=10,
    max_retries=5,
    retry_delay=2.0,
    enable_caching=True
)

参数

SnowflakeConfig 参数

SnowflakeConfig 类接受以下参数

  • account:必需。Snowflake 账户标识符。
  • user:必需。Snowflake 用户名。
  • password:可选*。Snowflake 密码。
  • private_key_path:可选*。私钥文件路径(密码的替代方案)。
  • warehouse:必需。Snowflake 仓库名称。
  • database:必需。默认数据库。
  • snowflake_schema:必需。默认模式。
  • role:可选。Snowflake 角色。
  • session_parameters:可选。自定义会话参数,字典格式。

*必须提供 passwordprivate_key_path 之一。

SnowflakeSearchTool 参数

SnowflakeSearchTool 在初始化时接受以下参数

  • config:必需。包含连接详细信息的 SnowflakeConfig 对象。
  • pool_size:可选。连接池中的连接数。默认值为 5。
  • max_retries:可选。失败查询的最大重试次数。默认值为 3。
  • retry_delay:可选。重试之间的延迟(秒)。默认值为 1.0。
  • enable_caching:可选。是否启用查询结果缓存。默认值为 True。

用法

使用 SnowflakeSearchTool 时,您需要提供以下参数

  • query:必需。要执行的 SQL 查询。
  • database:可选。覆盖配置中指定的默认数据库。
  • snowflake_schema:可选。覆盖配置中指定的默认模式。
  • timeout:可选。查询超时时间(秒)。默认值为 300。

工具将把查询结果作为字典列表返回,其中每个字典表示一行,以列名作为键。

代码
# Example of using the tool with an agent
data_analyst = Agent(
    role="Data Analyst",
    goal="Analyze sales data from Snowflake",
    backstory="An expert data analyst with experience in SQL and data visualization.",
    tools=[snowflake_tool],
    verbose=True
)

# The agent will use the tool with parameters like:
# query="SELECT product_name, SUM(revenue) as total_revenue FROM sales GROUP BY product_name ORDER BY total_revenue DESC LIMIT 5"
# timeout=600

# Create a task for the agent
analysis_task = Task(
    description="Query the sales database and identify the top 5 products by revenue for the last quarter.",
    expected_output="A detailed analysis of the top 5 products by revenue.",
    agent=data_analyst
)

# Run the task
crew = Crew(
    agents=[data_analyst], 
    tasks=[analysis_task]
)
result = crew.kickoff()

高级功能

连接池

SnowflakeSearchTool 实现了连接池以通过重用数据库连接来提高性能。您可以使用 pool_size 参数控制连接池的大小。

自动重试

工具会自动以指数退避的方式重试失败的查询。您可以使用 max_retriesretry_delay 参数配置重试行为。

查询结果缓存

为了提高重复查询的性能,工具可以缓存查询结果。此功能默认启用,但可以通过设置 enable_caching=False 来禁用。

密钥对认证

除了密码认证,工具还支持密钥对认证以增强安全性

代码
config = SnowflakeConfig(
    account="your_account",
    user="your_username",
    private_key_path="/path/to/your/private/key.p8",
    warehouse="COMPUTE_WH",
    database="your_database",
    snowflake_schema="your_schema"
)

错误处理

SnowflakeSearchTool 包含针对常见 Snowflake 问题的全面错误处理

  • 连接失败
  • 查询超时
  • 认证错误
  • 数据库和模式错误

当发生错误时,工具将尝试重试操作(如果已配置)并提供详细的错误信息。

结论

SnowflakeSearchTool 提供了一种强大的方式,将 Snowflake 数据仓库与 CrewAI 智能体集成。凭借连接池、自动重试和查询缓存等功能,它能够高效可靠地访问企业数据。此工具特别适用于需要访问存储在 Snowflake 中的结构化数据的数据分析、报告和商业智能任务。