关于 kafka 的详细原理,请参见:Apache Kafka 学习笔记
下载
kafka 官网地址:http://kafka.apache.org/
kafka 下载地址:https://kafka.apache.org/downloads
本文选择的版本是:kafka_2.11-2.3.1
kafka 的版本命令规则为:kafka-A-B.tgz
A 表示编译 Kafka 源代码的 Scala 编译器版本。
B 表示 kafka 真正的发行版本号
这里 B 要特别说明一下,在 kafka 1.0.0 版本之后,B 的版本命名规则正式从 4 位演进到 3 位。
比如上图中的:2.3.1,前面的 2 表示大版本号,即 Major Version;中间的 3 表示小版本号或次版本号,即 Minor Version;最后的 1 表示修订版本号,也就是 Patch 号。
安装(windows 环境)
创建数据保存目录
将下载好的 kafka_2.11-2.3.1.tgz 安装包解压,将 kafka_2.11-2.3.1 文件目录方法固定位置,在其根目录下创建一个名称为logs
的文件夹。
配置 server.properties
进入 config 文件夹,使用文本编辑器打开 server.properties,修改log.dirs
默认配置为上述创建数据保存目录:
官网文档:http://kafka.apache.org/23/documentation.html#quickstart
启动 kafka 的前提是需要搭建 zookeeper 服务,笔者已经安装过了,这里不再赘述,具体可参见:zookeeper window环境安装、集成为windows服务、单机伪集群搭建。
快速开始
启动服务
windows 运行脚本:
1 | .\bin\windows\kafka-server-start.bat .\config\server.properties |
linux 运行脚本:
1 | bin/kafka-server-start.sh config/server.properties |
控制台提示如下字样,表示启动成功:
1 | [2020-04-07 09:44:12,747] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer) |
创建主题
windows 运行脚本:
1 | .\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test |
linux 运行脚本:
1 | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test |
创建成功之后,控制台没有任何提示。使用如下命令可以查看已存在的主题列表:
windows 运行脚本:
1 | .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092 |
linux 运行脚本:
1 | bin/kafka-topics.sh --list --bootstrap-server localhost:9092 |
下图可以看到已成功创建一个主题:
值得注意的是,服务端控制台显示的日志已经可以看出主题创建成功:
发送消息
kafka 自带命令行工具,可以把控制台的标准输入流通过以行的形式发送到 kafka 集群。
windows 运行脚本:
1 | .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test |
linux 运行脚本:
1 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
此时,控制状态为光标闪烁,输入一些字符串到控制台:
消费消息
kafka 自带命令行工具,可以自动消费 kafka 集群中的消息并发送到标准控制台:
windows 运行脚本:
1 | .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning |
linux 运行脚本:
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
当消费者启动好之后,可以看到控制台自动打印上述生产者对 test 主题发送的历史消息:
为啥能接收到历史消息呢?因为在启动消费者的时候参数中带了--from-beginning
,如果不带这个参数,那么只会接收当前消费者启动时刻之后,生产者发送到该主题的消息。再启动一个不带接收历史消息的消费者: