Apache Kafka 是由 Apache 管理的开源流处理平台,由 Scala 和 Java 编写,为处理实时数据提供了统一、高吞吐、低延迟的功能特性。
其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。目前已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序等领域。
实现方式:kafka-logger
Apache APISIX 早在 1.2 版本开始就已经提供了 kafka-logger
插件的支持,其后又经过多次功能强化,目前已具备非常成熟且完善的功能。支持将 API 请求日志,甚至请求体和响应体以 JSON 格式推送至 Kafka 集群中。
使用 kafka-logger
时,用户可以发送多种数据并自定义发送的日志格式,同时还支持以批处理的方式打包发送日志或进行自动重试等功能。
如何使用
步骤1:启动 Kafka 集群
本文示例只演示了一种启动方式,其他启动方式细节可参考 Kafka 官方文档。
1# 使用 docker-compose 启动一个具有 1个 zookeeper 节点、3个 kafka 节点的集群
2# 同时还启动一个 EFAK 用于数据监控。
3version: '3'
4
5services:
6 zookeeper:
7 image: confluentinc/cp-zookeeper:6.2.1
8 hostname: zookeeper
9 ports:
10 - "2181:2181"
11 environment:
12 ZOOKEEPER_CLIENT_PORT: 2181
13 ZOOKEEPER_SERVER_ID: 1
14 ZOOKEEPER_SERVERS: zookeeper:2888:3888
15
16 kafka1:
17 image: confluentinc/cp-kafka:6.2.1
18 hostname: kafka1
19 ports:
20 - "9092:9092"
21 environment:
22 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
23 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
24 KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
25 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
26 KAFKA_BROKER_ID: 1
27 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
28 depends_on:
29 - zookeeper
30
31 kafka2:
32 image: confluentinc/cp-kafka:6.2.1
33 hostname: kafka2
34 ports:
35 - "9093:9093"
36 environment:
37 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
38 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
39 KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
40 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
41 KAFKA_BROKER_ID: 2
42 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
43 depends_on:
44 - zookeeper
45
46
47 kafka3:
48 image: confluentinc/cp-kafka:6.2.1
49 hostname: kafka3
50 ports:
51 - "9094:9094"
52 environment:
53 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
54 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
55 KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
56 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
57 KAFKA_BROKER_ID: 3
58 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
59 depends_on:
60 - zookeeper
61
62 efak:
63 image: nickzurich/kafka-eagle:2.0.9
64 hostname: efak
65 ports:
66 - "8048:8048"
67 depends_on:
68 - kafka1
69 - kafka2
70 - kafka3
步骤2:创建 Topic
接下来我们创建 test Topic
用于收集日志。
步骤3:创建路由并开启插件
通过下方命令可进行路由创建与 kafka-logger
插件的开启。
1curl -XPUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \
2--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \
3--header 'Content-Type: application/json' \
4--data-raw '{
5 "uri": "/*",
6 "plugins": {
7 "kafka-logger": {
8 "batch_max_size": 1,
9 "broker_list": {
10 "127.0.0.1": 9092
11 },
12 "disable": false,
13 "kafka_topic": "test",
14 "producer_type": "sync"
15 }
16 },
17 "upstream": {
18 "nodes": {
19 "httpbin.org:80": 1
20 },
21 "type": "roundrobin"
22 }
23}'
上述代码中配置了 kafka broker
地址、目标 Topic、同步的生产模式和每一批次包含的最大日志数量。这里我们可以先将 batch_max_size
设置为 1,即每产生一条日志就向 Kafka 写入一条消息。
通过上述设置,就可以实现将 /*
路径下的 API 请求日志发送至 Kafka 的功能。
步骤4:发送请求
接下来我们通过 API 发送一些请求,并记录下请求次数。
1# 向 API 发送 10 次请求
2curl http://127.0.0.1:9080/get
通过下图可以看到,有一些日志消息已经被写入到我们创建的 test topic
2 中。点击查看日志内容,可以发现上述进行的 API 请求日志已经被写入了。
自定义日志结构
当然,在使用过程中我们也可以通过 kafka-logger
插件提供的元数据配置,来设置发送至 Kafka 的日志数据结构。通过设置 log_format
数据,可以控制发送的数据类型。
比如以下数据中的 $host
、$time_iso8601
等,都是来自于 Nginx 提供的内置变量;也支持如 $route_id
和 $service_id
等 Apache APISIX 提供的变量配置。
1curl -XPUT 'http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger' \
2--header 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' \
3--header 'Content-Type: application/json' \
4--data-raw '{
5 "log_format": {
6 "host": "$host",
7 "@timestamp": "$time_iso8601",
8 "client_ip": "$remote_addr",
9 "route_id": "$route_id"
10 }
11}'
通过发送请求进行简单测试,可以看到上述日志结构设置已生效。目前 Apache APISIX 提供多种日志格式模板,在配置上具有极大的灵活性,更多日志格式细节可参考 Apache APISIX 官方文档。
关闭插件
如使用完毕,只需移除路由配置中 kafka-logger
插件相关配置并保存,即可关闭路由上的插件。得益于 Apache APISIX 的动态化优势,开启关闭插件的过程都不需要重启 Apache APISIX,十分方便。
1curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
2{
3 "methods": ["GET"],
4 "uri": "/hello",
5 "plugins": {},
6 "upstream": {
7 "type": "roundrobin",
8 "nodes": {
9 "127.0.0.1:1980": 1
10 }
11 }
12}'
总结
本文为大家介绍了关于 kafka-logger 插件的功能前瞻与使用步骤,更多关于 kafka-logger 插件说明和完整配置列表,可以参考官方文档。
也欢迎随时在 GitHub Discussions 中发起讨论,或通过邮件列表进行交流。