kafka-logger
kafka-logger 插件将请求和响应日志作为 JSON 对象分批推送到 Apache Kafka 集群,并支持自定义日志格式。
示例
以下示例展示了如何在不同场景下配置 kafka-logger 插件。
要跟随示例操作,请使 用以下 Docker compose 文件启动一个 Kafka 集群示例:
docker-compose.yml
services:
zookeeper-server1:
image: bitnami/zookeeper:3.6.0
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
restart: unless-stopped
ports:
- "2181:2181"
networks:
kafka_net:
zookeeper-server2:
image: bitnami/zookeeper:3.6.0
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
restart: unless-stopped
ports:
- "12181:12181"
networks:
kafka_net:
kafka-server1:
image: bitnami/kafka:2.8.1
container_name: notkafka
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server1:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
restart: unless-stopped
ports:
- "9092:9092"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
networks:
kafka_net:
启动容器:
docker compose up -d
在配置的 Kafka 主题中等待消息:
docker exec -it notkafka kafka-console-consumer.sh --bootstrap-server kafka-server1:9092 --topic test2 --from-beginning
打开一个新的终端会话以执行以下与 APISIX 相关的步骤。
以不同的元日志格式记录日志
以下示例演示了 如何在路由上启用 kafka-logger 插件,该插件记录对路由的客户端请求并将日志推送到 Kafka。你还将了解 default 和 origin 元日志格式之间的区别。
创建如下配置 kafka-logger 的路由:
curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"id": "kafka-logger-route",
"uri": "/get",
"plugins": {
"kafka-logger": {
"meta_format": "default",
"brokers": [
{
"host": "127.0.0.1",
"port": 9092
}
],
"kafka_topic": "test2",
"key": "key1",
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"httpbin.org:80": 1
},
"type": "roundrobin"
}
}'
❶ meta_format: 设置为 default 日志格式。
❷ batch_max_size: 设置为 1 以立即发送日志条目。
向路由发送请求以生成日志条目:
curl -i "http://127.0.0.1:9080/get"
你应该看到 HTTP/1.1 200 OK 响应。
你应该在 Kafka 主题中看到类似于以下的日志条目:
{
"latency": 411.00001335144,
"request": {
"querystring": {},
"headers": {
"host": "127.0.0.1:9080",
"user-agent": "curl/7.74.0",
"accept": "*/*"
},
"method": "GET",
"size": 83,
"uri": "/get",
"url": "http://127.0.0.1:9080/get"
},
"response": {
"headers": {
"content-length": "233",
"access-control-allow-credentials": "true",
"content-type": "text/html; charset=utf-8",
"connection": "close",
"access-control-allow-origin": "*",
"date": "Fri, 10 Nov 2023 06:02:44 GMT",
"server": "APISIX/3.8.0"
},
"status": 200,
"size": 475
},
"route_id": "kafka-logger-route",
"client_ip": "127.0.0.1",
"server": {
"hostname": "debian-apisix",
"version": "3.8.0"
},
"apisix_latency": 18.00001335144,
"service_id": "",
"upstream_latency": 393,
"start_time": 1699596164550,
"upstream": "54.90.18.68:80"
}
将元日志格式更新为 origin:
curl "http://127.0.0.1:9180/apisix/admin/routes/kafka-logger-route" -X PATCH \
-H "X-API-KEY: ${ADMIN_API_KEY}" \
-d '{
"plugins": {
"kafka-logger": {
"meta_format": "origin"
}
}
}'
再次向路由发送请求以生成新的日志条目:
curl -i "http://127.0.0.1:9080/get"
你应该看到 HTTP/1.1 200 OK 响应。
你应该在 Kafka 主题中看到类似于以下的日志条目:
GET /get HTTP/1.1
host: 127.0.0.1:9080
user-agent: curl/7.74.0
accept: */*