一、kafka概述
1.1 定义
Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
1.2 消息队列
-
1.2.1 传统消息队列的应用场景
- 使用消息队列的好处:
-
解耦:
允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 -
可恢复性:
系统的一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 -
缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 -
灵活性和峰值处理能力:
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 -
异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
-
1.2.2 消息队列的两种形式
- 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除。)
消息生产者生产消息发送到 Queue 中,然后消费者从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再有存储,所以消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但对于一个消息而言,只有一个消费者可以消费。
- 发布 / 订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 中的消息会被所有订阅者消费。
1.3 Kafka 基础架构
- Producer:
消息生产者,就是向 Kafka broker 发消息的客户端。 - Consumer:
消息消费者,向 Kafka broker 取消息的客户端。 - Consumer Group(CG):
消费者组,由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 - Broker:
一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。 - Topic:
可以理解为一个队列,生产者和消费者面向的都是一个 topic。 - Partiton:
为了实现拓展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 Partition,每个 partition 都是一个有序的队列。 - Replication:
副本,为保证集群中某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然可以继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。 - leader:
每个分区多个副本的 ” 主 “,生产者发送数据的对象,以及消费者消费数据时的对象都是 leader。 - follower:
每个分区多个副本的 “从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 leader。
二、kafka安装部署(单节点部署)
2.1、环境准备
搭建kafka集群至少需要3台服务器(或虚拟机也可),提前准备好3台不同IP的服务器
主机名 | IP |
---|---|
kafkaserver1 | 10.0.0.7 |
kafkaserver2 | 10.0.0.8 |
kafkaserver3 | 10.0.0.9 |
获取kafka:下载 //kafkaserver1部署即可
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz
下载完成后进行解压
[root@kafkaserver1 ~]# tar xf kafka_2.12-3.5.1.tgz -C /app/
[root@kafkaserver1 ~]# cd /app/kafka_2.12-3.5.1/
[root@kafkaserver1 kafka_2.12-3.5.1]# ll
total 64
drwxr-xr-x 3 root root 4096 Jul 15 00:52 bin
drwxr-xr-x 3 root root 4096 Jul 15 00:52 config
drwxr-xr-x 2 root root 8192 Nov 9 17:14 libs
-rw-rw-r-- 1 root root 14722 Jul 15 00:50 LICENSE
drwxr-xr-x 2 root root 262 Jul 15 00:52 licenses
-rw-rw-r-- 1 root root 28184 Jul 15 00:50 NOTICE
drwxr-xr-x 2 root root 44 Jul 15 00:52 site-docs
2.2、启动kafka
Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动,但不能同时使用
以前的kafka之前还需要准备zookeeper环境,新版的kafka中自带由,免去了很多麻烦
第一步:按顺序启动所有服务:
# Start the ZooKeeper service
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/zookeeper-server-start.sh config/zookeeper.properties
第二步:打开另一个终端会话并运行:
# Start the Kafka broker service
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-server-start.sh config/server.properties
以上两步命令运行后,可以看到kafka服务已经启动
2.3、创建主题
kafka的事件储存在主题中,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。所以,在使用前必须为事件创建主题。
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-topics.sh --create --topic kafka-topic-test --bootstrap-server localhost:9092
Kafka 的所有命令行工具都有额外的选项:运行kafka-topics.sh不带任何参数的命令以显示使用信息。例如,显示 新主题的 分区计数等详细信息:
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-topics.sh --describe --topic kafka-topic-test --bootstrap-server localhost:9092
2.4、 事件写入主题
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-console-producer.sh --topic kafka-topic-test --bootstrap-server localhost:9092
2.5、事件读取
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-console-consumer.sh --topic kafka-topic-test --from-beginning --bootstrap-server localhost:9092
2.6、停止kafka环境
使用 停止生产者和消费者客户Ctrl-C。 使用 停止 Kafka 代理Ctrl-C。如果使用 Kafka && ZooKeeper,还需要停止 ZooKeeper 服务器 Ctrl-C
如果还想删除本地 Kafka 环境的任何数据,包括在此过程中创建的任何事件,请运行以下命令
rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs
2.7、使用system管理启动
zookeeper
cat >> /usr/lib/systemd/system/zookeeper.service << EOF
Description= zookeeper.service (broker)
After=network.target zookeeper.service
[Service]
Type=simple
##kafka启动运行的脚本指定配置文件
ExecStart=/app/kafka_2.12-3.5.1/bin/zookeeper-server-start.sh /app/kafka_2.12-3.5.1/config/zookeeper.properties
ExecStop=/app/kafka_2.12-3.5.1/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
kafka
cat >> /usr/lib/systemd/system/kafka.service << EOF
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
##kafka启动运行的脚本指定配置文件
ExecStart=/app/kafka_2.12-3.5.1/bin/kafka-server-start.sh /app/kafka_2.12-3.5.1/config/server.properties
ExecStop=/app/kafka_2.12-3.5.1/bin/ kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
重新加载配置文件
[root@kafkaserver1 ~]# systemctl daemon-reload
启动服务查看验证
[root@kafkaserver1 ~]# systemctl start zookeeper.service kafka.service
三、搭建 kafka 集群
前提条件:按照第二步骤的方法为列出的三台服务器全部安装 kakfa 。
我们这里用的 kafka && ZooKeeper 模式,所以先要搭建 zookeeper 集群。
3.1、ZooKeeper 集群搭建
配置文件修改
[root@kafkaserver1 kafka_2.12-3.5.1]# vim config/zookeeper.properties
----
dataDir=/var/zookeeper
initLimit=5
syncLimit=2
tickTime=2000
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=10.0.0.7:2888:3888
server.2=10.0.0.8:2888:3888
server.3=10.0.0.9:2888:3888
创建数据文件夹
# 新建zookeeper.properties 中配置的dataDir目录
[root@kafkaserver1 kafka_2.12-3.5.1]# mkdir /var/zookeeper
[root@kafkaserver1 kafka_2.12-3.5.1]# cd /var/zookeeper
# 新建myid文件,三个节点分别设置值为1,2,3
[root@kafkaserver1 zookeeper]# touch myid
[root@kafkaserver1 zookeeper]# echo 1 > /var/zookeeper/myid
记得端口是否放开,若没有则有两种方法
- 关闭防火墙
systemctl stop firewalld
2.或者防火墙放行
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --reload
然后在其他的2个服务器重重复上面的步骤,(注意节点设置)
3.2、kafka 集群搭建
修改server.properties
[root@kafkaserver1 kafka_2.12-3.5.1]# vim config/server.properties
----
#每个节点唯一
broker.id=1
#填写每个节点的本机IP
listeners=PLAINTEXT://10.0.0.7:9092
#这里有几个成员就写几个
zookeeper.connect=10.0.0.7:2181,192.10.0.0.8:2181,10.0.0.9:2181
#我这里有三台机器所以设置下面的属性为3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
----
group.initial.rebalance.delay.ms=3
启动kafka如果报错,则对meta.properties进行修改,该文件默认在 /tmp/kafka-logs
删除cluster.id,broker.id等于server.properties里的即可 //记得备份一份!!!!
[root@kafkaserver1 kafka-logs]# vim meta.properties
#
#Fri Nov 10 14:56:24 CST 2023
version=0
broker.id=1
#先停止服务后在修改文件!!!
其它两台节点,重复如上操作即可,注意broker.id唯一性
3.3、kafka 集群测试
搭建完成后,我们简单的测试一下是否成功。
我们在节点 10.0.0.7上启动一个发布者进行消息发布,在节点10.0.0.8和10.0.0.9上分别启动一个消费者进行事件的消费
创建测试用主题
bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.9:9092 --offset latest --topic topic-test
创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.7:9092 --offset latest --topic topic-test
创建发布者
bin/kafka-console-producer.sh --topic topic-test --broker-list 10.0.0.7:9092,10.0.0.8:9092,10.0.0.9:9092
测试结果:
四、kafka常用命令使用
4.1、kafka启停命令
1.前台启动
kafka 前台启动命令
bin/kafka-server-start.sh config/server.properties
2.后台启动
后台常驻方式,带上参数 -daemon,如:
bin/kafka-server-start.sh -daemon config/server.properties
或者
nohup bin/kafka-server-start.sh config/server.properties
指定 JMX port 端口启动,指定 jmx,可以方便监控 Kafka 集群
JMX_PORT=9991 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
3.停止命令
bin/kafka-server-stop.sh
4.2、Topic (主题)相关命令
4.2.1、 创建 Topic
参数 --topic 指定 Topic 名,–partitions 指定分区数,–replication-factor 指定备份(副本)数
创建名为 test_kafka_topic 的 Topic
#旧版本
bin/kafka-topics.sh -zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test_kafka_topic
#新版本
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic test_kafka_topic
bin/kafka-topics.sh
: 启动 Kafka 主题管理工具脚本。-zookeeper localhost:2181
: 指定连接到 ZooKeeper 的地址。在早期版本的 Kafka 中,ZooKeeper 被用于管理 Kafka 的元数据。在较新的 Kafka 版本中,Kafka 已经移除了对 ZooKeeper 的直接依赖,而是采用了更现代的元数据存储方式。因此,实际上,新版本的 Kafka 通常会使用--bootstrap-server
参数指定 Kafka 服务器的地址,而不再使用 ZooKeeper。--create
: 表示创建主题的操作。--partitions 5
: 指定主题创建时的分区数为 5。分区用于并行处理和提高吞吐量。--replication-factor 1
: 指定每个分区的副本数量为 1。在生产环境中,通常会将该值设置为大于 1 的值,以提高容错性。--topic test_kafka_topic
: 指定要创建的主题的名称为 "test_kafka_topic"。
这个命令的效果是创建了一个名为 "test_kafka_topic" 的主题,该主题有 5 个分区,每个分区有一个副本,ZooKeeper 的地址是 localhost:2181
。请注意,实际上,新版本的 Kafka 不再强制使用 ZooKeeper,而是更多地依赖于 --bootstrap-server
指定的 Kafka 服务器。
注意,如果配置文件 server.properties 指定了 Kafka 在 zookeeper 上的目录,则参数也要指定,否则会报无可用的 brokers(下面部分命令也有同样的情况),如:
#旧版本
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test
#新版本
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
/usr/local/kafka/bin/kafka-topics.sh
: 启动 Kafka 主题管理工具脚本的完整路径。--create
: 表示创建主题的操作。--zookeeper localhost:2181/kafka
: 指定连接到 ZooKeeper 的地址。这里的地址是localhost:2181
,而/kafka
是 ZooKeeper 中存储 Kafka 数据的路径。在较新的 Kafka 版本中,ZooKeeper 已经逐渐被替代,而实际生产环境中更常使用--bootstrap-server
参数来指定 Kafka 服务器的地址。--replication-factor 1
: 指定每个分区的副本数量为 1。在生产环境中,通常会将该值设置为大于 1 的值,以提高容错性。--partitions 1
: 指定主题创建时的分区数为 1。分区用于并行处理和提高吞吐量。--topic test
: 指定要创建的主题的名称为 "test"。
这个命令的效果是创建了一个名为 "test" 的主题,该主题有 1 个分区,每个分区有 1 个副本。请注意,实际上,新版本的 Kafka 不再强制使用 ZooKeeper,而是更多地依赖于 --bootstrap-server
指定的 Kafka 服务器。
4.2.2.、查询 Topic 列表
列出所有 Topic
#旧版本
bin/kafka-topics.sh --list --zookeeper localhost:2181
#新版本
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
4.2.3、 查询 Topic 详情
查询 Topic 的详细信息
#旧版本
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_kafka_topic
#新版本
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test_kafka_topic
说明:如果未指定 topic 则输出所有 topic 的信息
4.2.4、 增加 Topic 的 partition 数
#旧版本
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_kafka_topic --partitions 5
#新版本
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test_kafka_topic --partitions 5
4.2.5.、查看 topic 指定分区 offset 的最大值或最小值
time 为 -1 时表示最大值,为 -2 时表示最小值:
#旧版本
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test_kafka_topic --time -1 --broker-list 127.0.0.1:9092 --partitions 0
#新版本
bin/kafka-run-class.sh kafka.admin.GetOffsetsShell --topic test_kafka_topic --time -1 --broker-list localhost:9092 --partitions 0
4.2.6.、删除Topic
删除名为 test_kafka_topic 的 Topic
#旧版本
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test_kafka_topic
#新版本
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test_kafka_topic
说明:在${KAFKA_HOME}/config/server.properties中配置 delete.topic.enable 为 true,这样才能生效,删除指定的 topic主题
4.3、消息 相关命令
4.3.1、 发送消息生产者发送消息
生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_kafka_topic
4.3.2.、消费消息(从头开始)
消费者查询消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test_kafka_topic
4.3.3.、消费消息(从尾开始)
从尾部开始取数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest
4.3.4、 消费消息(从尾开始指定分区)
从尾部开始取数据,指定分区消费:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest --partition 0
4.3.5.、消费消息(指定分区指定偏移量)
–partition :指定要消费的分区号
–offset :指定起始偏移量消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --partition 0 --offset 100
4.3.6、指定分组->消费消息
消费者消费消息(指定分组)
注意给客户端命名之后,如果之前有过消费,那么–from-beginning就不会再从头消费了
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test_kafka_topic --group t1
说明:
–from-beginning:表示从头开始接收数据
–group:指定消费者组
4.3.7、取指定个数
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest --partition 0 --max-messages 1
4.4、消费者 Group
4.4.1、指定 Group
指定1分组从头开始消费消息(应该会指定偏移量)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
4.4.2、查看消费者 Group 列表
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
4.4.3、 查看 Group 详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
4.4.4、删除 Group 中 Topic
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete
4.4.5、删除 Group
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete
4.5、补充命令
4.5.1、平衡 leader
bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
4.5.2、自带压测工具
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092