将请求日志加载到 Snowflake
AISIX 可以通过可观测性导出器将请求遥测写入对象存储。完成导出器配置后,如果财务、商业智能或审计团队需要在 Snowflake 中查询网关用量、成本、延迟、缓存结果和安全护栏结果,可以使用本指南。
网关会将批量 NDJSON 对象写入你拥有的 bucket。Snowpipe 会将这些对象导入 Snowflake 表,SQL 视图再把每条请求记录转换为可查询的列。
准备工作
请先准备以下内容:
- 一个 Admin 和代理监听器都可用的自托管 AISIX 网关。
- 网关
config.yaml中的 Admin Key。 - 一个可以发送 Chat Completions 请求的模型别名和调用方 API Key。
- 一个用于暂存请求遥测的 Amazon S3 bucket 或 Azure Blob container。
- 一个 Snowflake 账户,以及可创建 storage integrations、notification integrations、stages、pipes、tables 和 views 的角色。
本指南使用 acme-aisix-events 作为 S3 bucket 名称,使用 ai-gateway 作为对象前缀。请替换为你自己环境中的值。
遥测导入流程
object_store 导出器是对象存储暂存路径。AISIX 不会在请求链路上直接把请求遥测推送到 Snowflake。
每个受支持的请求完成后,AISIX 会发出请求遥测。导出器会将 gzip 压缩的 NDJSON 对象写入 Amazon S3 或 Azure Blob Storage。Snowpipe 将每条记录加载到 Snowflake 落地表,SQL 视图则暴露请求、成本、延迟、缓存和安全护栏字段用于分析。
导出器投递默认以元数据为主,包括 request ID、请求的模型别名、解析后的模型 ID、状态、token 计数、成本、 延迟、缓存状态和安全护栏结果等字段。除非受支持的导出器显式配置内容采集,否则不会包含 prompt 或响应文本。
请为每个环境使用独立的 bucket prefix。AISIX 会在已配置前缀下按日期和小时分区对象,但对象路径不会自动为你添加环境段。
创建对象存储导出器
请先创建导出器,并确认文件能够落到对象存储,再连接 Snowflake。这样可以把网关侧问题和数据仓库导入问题拆开排查。
Amazon S3
创建一个将请求遥测写入 S3 的导出器:
curl -sS -X POST "http://127.0.0.1:3001/admin/v1/observability_exporters" \
-H "Authorization: Bearer YOUR_ADMIN_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "snowflake-staging",
"kind": "object_store",
"provider": "s3",
"bucket": "acme-aisix-events",
"prefix": "ai-gateway",
"region": "us-east-1",
"credential_ref": "acme_s3_prod"
}'
在网关进程环境中设置对应凭证。后缀来自 credential_ref 的值并转换为大写。如果在网关启动后新增或修改这些变量,请在测试投递前重启网关。
export OBJSTORE_CRED_ACME_S3_PROD_AWS_ACCESS_KEY_ID="YOUR_AWS_ACCESS_KEY_ID"
export OBJSTORE_CRED_ACME_S3_PROD_AWS_SECRET_ACCESS_KEY="YOUR_AWS_SECRET_ACCESS_KEY"
生成几次请求,让导出器有遥测数据可以 flush:
for i in 1 2 3; do
curl -sS -X POST "http://127.0.0.1:3000/v1/chat/completions" \
-H "Authorization: Bearer YOUR_CALLER_API_KEY" \
-H "Content-Type: application/json" \
-d '{"model":"gpt-4o-prod","messages":[{"role":"user","content":"ping"}]}' > /dev/null
done
sleep 8
列出暂存对象:
aws s3 ls "s3://acme-aisix-events/ai-gateway/" --recursive
bucket 中应该会在配置的前缀、日期分区和小时分区下出现一个或多个 .ndjson.gz 对象:
2026-06-09 14:03:11 742 ai-gateway/dt=2026-06-09/hh=14/9f86d081884c7d659a2feaa0c55ad015.ndjson.gz
配置 Snowpipe 前,先检查一个对象:
aws s3 cp "s3://acme-aisix-events/ai-gateway/dt=2026-06-09/hh=14/OBJECT.ndjson.gz" - | gunzip
输出中每一行都是一条请求遥测记录。下面是格式化后的一条示例记录:
{
"schema_version": "1.0",
"request_id": "742c6f5e-7b97-4bb1-9f5f-8cb42b4c93e1",
"occurred_at": "2026-06-09T14:03:10Z",
"requested_model": "gpt-4o-prod",
"model_id": "b7c8e4f2-2e4d-4776-a8d7-09b4eb0cb2b1",
"prompt_tokens": 8,
"completion_tokens": 12,
"latency_ms": 612,
"status_code": 200,
"cost_usd": 0.00021,
"cache_status": "miss",
"guardrail_blocked": false
}
Azure Blob
创建一个将请求遥测写入 Azure Blob 的导出器:
curl -sS -X POST "http://127.0.0.1:3001/admin/v1/observability_exporters" \
-H "Authorization: Bearer YOUR_ADMIN_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "snowflake-staging",
"kind": "object_store",
"provider": "azure_blob",
"bucket": "ai-gateway",
"prefix": "ai-gateway",
"credential_ref": "acme_az_prod"
}'
在网关进程环境中设置对应凭证。如果在网关启动后新增或修改这些变量,请在测试投递前重启网关。
export OBJSTORE_CRED_ACME_AZ_PROD_AZURE_ACCOUNT="YOUR_STORAGE_ACCOUNT"
export OBJSTORE_CRED_ACME_AZ_PROD_AZURE_ACCESS_KEY="YOUR_STORAGE_ACCESS_KEY"
生成流量并列出暂存 blob:
for i in 1 2 3; do
curl -sS -X POST "http://127.0.0.1:3000/v1/chat/completions" \
-H "Authorization: Bearer YOUR_CALLER_API_KEY" \
-H "Content-Type: application/json" \
-d '{"model":"gpt-4o-prod","messages":[{"role":"user","content":"ping"}]}' > /dev/null
done
sleep 8
az storage blob list \
--account-name "YOUR_STORAGE_ACCOUNT" \
--account-key "YOUR_STORAGE_ACCESS_KEY" \
--container-name "ai-gateway" \
--prefix "ai-gateway/" \
--query "[].name" \
-o tsv
container 中应该会在相同的前缀、日期分区和小时分区布局下出现 gzip 压缩的 NDJSON 对象。
创建 Snowflake 落地表
创建一个包含 VARIANT 列的落地表,让 Snowflake 可以导入每条 NDJSON 记录且不丢失字段:
CREATE DATABASE IF NOT EXISTS aisix;
CREATE SCHEMA IF NOT EXISTS aisix.gateway;
USE SCHEMA aisix.gateway;
CREATE TABLE IF NOT EXISTS gateway_events (
record VARIANT,
source_file STRING,
loaded_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
当 pipe file format 使用 COMPRESSION = AUTO 时,Snowflake 会自动识别 gzip。
将 Snowflake 连接到 S3
对于 S3,Snowflake 通过 storage integration 读取 bucket,并通过与 pipe 关联的 SQS queue 接收对象创建事件。
创建 storage integration:
CREATE STORAGE INTEGRATION aisix_s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/aisix-snowflake-read'
STORAGE_ALLOWED_LOCATIONS = ('s3://acme-aisix-events/ai-gateway/');
DESC INTEGRATION aisix_s3_int;
使用 DESC INTEGRATION 返回的 STORAGE_AWS_IAM_USER_ARN 和 STORAGE_AWS_EXTERNAL_ID 配置 IAM role trust policy。为该 role 授予列出 bucket 和读取配置前缀下对象的权限。
创建 stage 和 pipe:
CREATE STAGE aisix_stage
URL = 's3://acme-aisix-events/ai-gateway/'
STORAGE_INTEGRATION = aisix_s3_int;
CREATE PIPE aisix_events_pipe
AUTO_INGEST = TRUE
AS
COPY INTO gateway_events (record, source_file)
FROM (SELECT $1, METADATA$FILENAME FROM @aisix_stage)
FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO);
SHOW PIPES;
使用 SHOW PIPES 返回的 notification_channel 值,为对象创建事件添加 S3 event notification:
aws s3api put-bucket-notification-configuration \
--bucket "acme-aisix-events" \
--notification-configuration '{
"QueueConfigurations": [{
"QueueArn": "arn:aws:sqs:us-east-1:NNNN:sf-snowpipe-example",
"Events": ["s3:ObjectCreated:*"],
"Filter": {"Key": {"FilterRules": [{"Name": "prefix", "Value": "ai-gateway/"}]}}
}]
}'
将 Snowflake 连接到 Azure Blob
对于 Azure Blob,Snowflake 通过 storage integration 读取 container,并通过 storage queue 接收对象创建事件。
创建 storage queue 和 Event Grid subscription:
az storage queue create \
--name "aisix-snowpipe" \
--account-name "YOUR_STORAGE_ACCOUNT"
az eventgrid event-subscription create \
--source-resource-id "/subscriptions/YOUR_SUBSCRIPTION_ID/resourceGroups/YOUR_RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/YOUR_STORAGE_ACCOUNT" \
--name "aisix-snowpipe-sub" \
--endpoint-type storagequeue \
--endpoint "/subscriptions/YOUR_SUBSCRIPTION_ID/resourceGroups/YOUR_RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/YOUR_STORAGE_ACCOUNT/queueServices/default/queues/aisix-snowpipe" \
--advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose
创建 notification integration:
CREATE NOTIFICATION INTEGRATION aisix_az_notif
ENABLED = TRUE
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://YOUR_STORAGE_ACCOUNT.queue.core.windows.net/aisix-snowpipe'
AZURE_TENANT_ID = 'YOUR_TENANT_ID';
DESC NOTIFICATION INTEGRATION aisix_az_notif;
为 DESC NOTIFICATION INTEGRATION 返回的 Snowflake service principal 授予访问 storage queue 的权限。
创建 storage integration、stage 和 pipe:
CREATE STORAGE INTEGRATION aisix_az_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'AZURE'
ENABLED = TRUE
AZURE_TENANT_ID = 'YOUR_TENANT_ID'
STORAGE_ALLOWED_LOCATIONS = ('azure://YOUR_STORAGE_ACCOUNT.blob.core.windows.net/ai-gateway/ai-gateway/');
DESC INTEGRATION aisix_az_int;
CREATE STAGE aisix_stage
URL = 'azure://YOUR_STORAGE_ACCOUNT.blob.core.windows.net/ai-gateway/ai-gateway/'
STORAGE_INTEGRATION = aisix_az_int;
CREATE PIPE aisix_events_pipe
AUTO_INGEST = TRUE
INTEGRATION = 'AISIX_AZ_NOTIF'
AS
COPY INTO gateway_events (record, source_file)
FROM (SELECT $1, METADATA$FILENAME FROM @aisix_stage)
FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO);
为 DESC INTEGRATION 返回的 Snowflake service principal 授予读取 blob container 的权限。
查询网关请求
继续通过 AISIX 发送流量后,检查 Snowpipe 是否正在接收文件:
SELECT SYSTEM$PIPE_STATUS('aisix_events_pipe');
确认数据已写入,并为原始记录创建视图:
SELECT COUNT(*) FROM gateway_events;
CREATE OR REPLACE VIEW gateway_requests AS
SELECT
record:request_id::string AS request_id,
record:occurred_at::timestamp_tz AS occurred_at,
record:requested_model::string AS requested_model,
record:model_id::string AS model_id,
record:status_code::number AS status_code,
record:prompt_tokens::number AS prompt_tokens,
record:completion_tokens::number AS completion_tokens,
record:cost_usd::float AS cost_usd,
record:latency_ms::number AS latency_ms,
record:cache_status::string AS cache_status,
record:guardrail_blocked::boolean AS guardrail_blocked,
record:finish_reason::string AS finish_reason,
source_file
FROM gateway_events;
查询最近请求:
SELECT requested_model, status_code, prompt_tokens, completion_tokens, cost_usd, cache_status
FROM gateway_requests
ORDER BY occurred_at DESC
LIMIT 10;
运行一个简单的成本和 token 汇总:
SELECT
requested_model,
COUNT(*) AS requests,
SUM(prompt_tokens + completion_tokens) AS total_tokens,
ROUND(SUM(cost_usd), 5) AS total_cost_usd
FROM gateway_requests
GROUP BY requested_model
ORDER BY total_cost_usd DESC;
清理
测试结束后删除 Snowflake 对象:
DROP PIPE IF EXISTS aisix_events_pipe;
DROP STAGE IF EXISTS aisix_stage;
DROP VIEW IF EXISTS gateway_requests;
DROP TABLE IF EXISTS gateway_events;
DROP STORAGE INTEGRATION IF EXISTS aisix_s3_int;
DROP STORAGE INTEGRATION IF EXISTS aisix_az_int;
DROP NOTIFICATION INTEGRATION IF EXISTS aisix_az_notif;
移除已创建的 bucket notification 或 Event Grid subscription,然后删除导出器:
curl -sS -X DELETE "http://127.0.0.1:3001/admin/v1/observability_exporters/YOUR_EXPORTER_ID" \
-H "Authorization: Bearer YOUR_ADMIN_KEY"
下一步
你已经将 AISIX 请求遥测加载到 Snowflake。当你需要把导出的用量事件与运行时指标、访问日志和响应头关联起来时,请继续阅读指标与日志。