跳到主要内容

kafka-logger

kafka-logger 插件将请求和响应日志作为 JSON 对象分批推送到 Apache Kafka 集群,并支持自定义日志格式。

示例

以下示例展示了如何在不同场景下配置 kafka-logger 插件。

要跟随示例操作,请使用以下 Docker compose 文件启动一个 Kafka 集群示例:

docker-compose.yml
services:
zookeeper-server1:
image: bitnami/zookeeper:3.6.0
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
restart: unless-stopped
ports:
- "2181:2181"
networks:
kafka_net:

zookeeper-server2:
image: bitnami/zookeeper:3.6.0
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
restart: unless-stopped
ports:
- "12181:12181"
networks:
kafka_net:

kafka-server1:
image: bitnami/kafka:2.8.1
container_name: notkafka
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server1:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
restart: unless-stopped
ports:
- "9092:9092"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:

networks:
kafka_net:

启动容器:

docker compose up -d

在配置的 Kafka 主题中等待消息:

docker exec -it notkafka kafka-console-consumer.sh --bootstrap-server kafka-server1:9092 --topic test2 --from-beginning

打开一个新的终端会话以执行以下与 APISIX 相关的步骤。

以不同的元日志格式记录日志

以下示例演示了如何在路由上启用 kafka-logger 插件,该插件记录对路由的客户端请求并将日志推送到 Kafka。你还将了解 defaultorigin 元日志格式之间的区别。

创建如下配置 kafka-logger 的路由:

curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/get",
"plugins": {
"kafka-logger": {
"meta_format": "default",
"brokers": [
{
"host": "127.0.0.1",
"port": 9092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'

meta_format: 设置为 default 日志格式。

batch_max_size: 设置为 1 以立即发送日志条目。

向路由发送请求以生成日志条目:

curl -i "http://127.0.0.1:9080/get"

你应该看到 HTTP/1.1 200 OK 响应。

你应该在 Kafka 主题中看到类似于以下的日志条目:

{
"latency": 411.00001335144,
"request": {
"querystring": {},
"headers": {
"host": "127.0.0.1:9080",
"user-agent": "curl/7.74.0",
"accept": "*/*"
},
"method": "GET",
"size": 83,
"uri": "/get",
"url": "http://127.0.0.1:9080/get"
},
"response": {
"headers": {
"content-length": "233",
"access-control-allow-credentials": "true",
"content-type": "text/html; charset=utf-8",
"connection": "close",
"access-control-allow-origin": "*",
"date": "Fri, 10 Nov 2023 06:02:44 GMT",
"server": "APISIX/3.8.0"
},
"status": 200,
"size": 475
},
"route_id": "kafka-logger-route",
"client_ip": "127.0.0.1",
"server": {
"hostname": "debian-apisix",
"version": "3.8.0"
},
"apisix_latency": 18.00001335144,
"service_id": "",
"upstream_latency": 393,
"start_time": 1699596164550,
"upstream": "54.90.18.68:80"
}

将元日志格式更新为 origin

curl "http://127.0.0.1:9180/apisix/admin/routes/kafka-logger-route" -X PATCH \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"plugins": {
"kafka-logger": {
"meta_format": "origin"
}
}
}'

再次向路由发送请求以生成新的日志条目:

curl -i "http://127.0.0.1:9080/get"

你应该看到 HTTP/1.1 200 OK 响应。

你应该在 Kafka 主题中看到类似于以下的日志条目:

GET /get HTTP/1.1
host: 127.0.0.1:9080
user-agent: curl/7.74.0
accept: */*

使用插件元数据记录请求和响应头

以下示例演示了如何使用插件元数据内置变量自定义日志格式,以记录请求和响应中的特定头。

在 APISIX 中,插件元数据用于配置同一插件的所有插件实例的通用元数据字段。当一个插件在多个资源中启用并且需要对其元数据字段进行通用更新时,这非常有用。

首先,创建如下配置 kafka-logger 的路由:

curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/get",
"plugins": {
"kafka-logger": {
"meta_format": "default",
"brokers": [
{
"host": "127.0.0.1",
"port": 9092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'

meta_format: 设置为 default 日志格式。请务必注意,如果你想使用插件元数据自定义日志格式,这是强制性的。如果 meta_format 设置为 origin,日志条目将保持 origin 格式。

batch_max_size: 设置为 1 以立即发送日志条目。

接下来,为 kafka-logger 配置插件元数据:

curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr",
"env": "$http_env",
"resp_content_type": "$sent_http_Content_Type"
}
}'

❶ 记录自定义请求头 env

❷ 记录响应头 Content-Type

向路由发送带有 env 头的请求:

curl -i "http://127.0.0.1:9080/get" -H "env: dev"

你应该在 Kafka 主题中看到类似于以下的日志条目:

{
"@timestamp": "2023-11-10T23:09:04+00:00",
"host": "127.0.0.1",
"client_ip": "127.0.0.1",
"route_id": "kafka-logger-route",
"env": "dev",
"resp_content_type":"application/json"
}

有条件地记录请求体

以下示例演示了如何有条件地记录请求体。

创建如下配置 kafka-logger 的路由:

curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/post",
"plugins": {
"kafka-logger": {
"brokers": [
{
"host": "127.0.0.1",
"port": 9092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1,
"include_req_body": true,
"include_req_body_expr": [["arg_log_body", "==", "yes"]]
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'

include_req_body: 设置为 true 以包含请求体。

include_req_body_expr: 仅当 URL 查询字符串 log_bodyyes 时才包含请求体。

向路由发送满足条件的带有 URL 查询字符串的请求:

curl -i "http://127.0.0.1:9080/post?log_body=yes" -X POST -d '{"env": "dev"}'

你应该看到记录的请求体:

{
...,
"method": "POST",
"body": "{\"env\": \"dev\"}",
"size": 179
}
}

向路由发送不带任何 URL 查询字符串的请求:

curl -i "http://127.0.0.1:9080/post" -X POST -d '{"env": "dev"}'

你不应在日志中观察到请求体。

信息

如果你除了将 include_req_bodyinclude_resp_body 设置为 true 之外还自定义了 log_format,则插件将不会在日志中包含这些主体。

作为解决方法,你可以在日志格式中使用 NGINX 变量 $request_body,例如:

{
"kafka-logger": {
...,
"log_format": {"body": "$request_body"}
}
}