一、kafka概述

1.1 定义
Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

1.2 消息队列

  • 1.2.1 传统消息队列的应用场景

    • 使用消息队列的好处:

    image-20231109160618541

    • 解耦:
      允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

    • 可恢复性:
      系统的一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

    • 缓冲:
      有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

    • 灵活性和峰值处理能力:
      使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    • 异步通信:
      很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

  • 1.2.2 消息队列的两种形式

    • 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除。)

    消息生产者生产消息发送到 Queue 中,然后消费者从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再有存储,所以消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但对于一个消息而言,只有一个消费者可以消费。

    image-20231109160800631

    • 发布 / 订阅模式(一对多,消费者消费数据之后不会清除消息)

    消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 中的消息会被所有订阅者消费。

    image-20231109160848923

1.3 Kafka 基础架构

image-20231109160928087

  • Producer:
    消息生产者,就是向 Kafka broker 发消息的客户端。
  • Consumer:
    消息消费者,向 Kafka broker 取消息的客户端。
  • Consumer Group(CG):
    消费者组,由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:
    一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  • Topic:
    可以理解为一个队列,生产者和消费者面向的都是一个 topic。
  • Partiton:
    为了实现拓展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 Partition,每个 partition 都是一个有序的队列。
  • Replication:
    副本,为保证集群中某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然可以继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • leader:
    每个分区多个副本的 ” 主 “,生产者发送数据的对象,以及消费者消费数据时的对象都是 leader。
  • follower:
    每个分区多个副本的 “从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 leader。

二、kafka安装部署(单节点部署)

2.1、环境准备

搭建kafka集群至少需要3台服务器(或虚拟机也可),提前准备好3台不同IP的服务器

主机名 IP
kafkaserver1 10.0.0.7
kafkaserver2 10.0.0.8
kafkaserver3 10.0.0.9

获取kafka:下载 //kafkaserver1部署即可

wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz

下载完成后进行解压

[root@kafkaserver1 ~]# tar xf kafka_2.12-3.5.1.tgz  -C /app/
[root@kafkaserver1 ~]# cd /app/kafka_2.12-3.5.1/
[root@kafkaserver1 kafka_2.12-3.5.1]# ll
total 64
drwxr-xr-x 3 root root  4096 Jul 15 00:52 bin
drwxr-xr-x 3 root root  4096 Jul 15 00:52 config
drwxr-xr-x 2 root root  8192 Nov  9 17:14 libs
-rw-rw-r-- 1 root root 14722 Jul 15 00:50 LICENSE
drwxr-xr-x 2 root root   262 Jul 15 00:52 licenses
-rw-rw-r-- 1 root root 28184 Jul 15 00:50 NOTICE
drwxr-xr-x 2 root root    44 Jul 15 00:52 site-docs

2.2、启动kafka

Apache Kafka 可以使用 ZooKeeper 或 KRaft 启动,但不能同时使用
以前的kafka之前还需要准备zookeeper环境,新版的kafka中自带由,免去了很多麻烦

第一步:按顺序启动所有服务:

# Start the ZooKeeper service
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/zookeeper-server-start.sh config/zookeeper.properties

第二步:打开另一个终端会话并运行:

# Start the Kafka broker service
[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-server-start.sh config/server.properties

image-20231109172703683

以上两步命令运行后,可以看到kafka服务已经启动

2.3、创建主题

kafka的事件储存在主题中,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。所以,在使用前必须为事件创建主题。

[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-topics.sh --create --topic kafka-topic-test --bootstrap-server localhost:9092

image-20231109173810210

Kafka 的所有命令行工具都有额外的选项:运行kafka-topics.sh不带任何参数的命令以显示使用信息。例如,显示 新主题的 分区计数等详细信息:

[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-topics.sh --describe --topic kafka-topic-test --bootstrap-server localhost:9092

image-20231109175016245

2.4、 事件写入主题

[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-console-producer.sh --topic kafka-topic-test --bootstrap-server localhost:9092

image-20231109175305706

2.5、事件读取

[root@kafkaserver1 kafka_2.12-3.5.1]# bin/kafka-console-consumer.sh --topic kafka-topic-test --from-beginning --bootstrap-server localhost:9092

image-20231109175445425

2.6、停止kafka环境

使用 停止生产者和消费者客户Ctrl-C。 使用 停止 Kafka 代理Ctrl-C。如果使用 Kafka && ZooKeeper,还需要停止 ZooKeeper 服务器 Ctrl-C

如果还想删除本地 Kafka 环境的任何数据,包括在此过程中创建的任何事件,请运行以下命令

rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

2.7、使用system管理启动

zookeeper

cat >> /usr/lib/systemd/system/zookeeper.service  << EOF
Description= zookeeper.service (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
##kafka启动运行的脚本指定配置文件
ExecStart=/app/kafka_2.12-3.5.1/bin/zookeeper-server-start.sh /app/kafka_2.12-3.5.1/config/zookeeper.properties
ExecStop=/app/kafka_2.12-3.5.1/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

kafka

cat >> /usr/lib/systemd/system/kafka.service << EOF
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
##kafka启动运行的脚本指定配置文件
ExecStart=/app/kafka_2.12-3.5.1/bin/kafka-server-start.sh /app/kafka_2.12-3.5.1/config/server.properties
ExecStop=/app/kafka_2.12-3.5.1/bin/ kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

重新加载配置文件

[root@kafkaserver1 ~]# systemctl daemon-reload

启动服务查看验证

[root@kafkaserver1 ~]# systemctl  start zookeeper.service  kafka.service 

image-20231110145904596

三、搭建 kafka 集群

前提条件:按照第二步骤的方法为列出的三台服务器全部安装 kakfa 。
我们这里用的 kafka && ZooKeeper 模式,所以先要搭建 zookeeper 集群。

3.1、ZooKeeper 集群搭建

配置文件修改

[root@kafkaserver1 kafka_2.12-3.5.1]# vim config/zookeeper.properties 
----
dataDir=/var/zookeeper
initLimit=5
syncLimit=2
tickTime=2000
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=10.0.0.7:2888:3888
server.2=10.0.0.8:2888:3888
server.3=10.0.0.9:2888:3888

image-20231110152347440

创建数据文件夹

# 新建zookeeper.properties 中配置的dataDir目录
[root@kafkaserver1 kafka_2.12-3.5.1]# mkdir /var/zookeeper
[root@kafkaserver1 kafka_2.12-3.5.1]# cd /var/zookeeper
# 新建myid文件,三个节点分别设置值为1,2,3
[root@kafkaserver1 zookeeper]# touch myid
[root@kafkaserver1 zookeeper]# echo 1 > /var/zookeeper/myid

记得端口是否放开,若没有则有两种方法

  1. 关闭防火墙
systemctl stop firewalld

2.或者防火墙放行

firewall-cmd --zone=public --add-port=2181/tcp --permanent 
firewall-cmd --zone=public --add-port=3888/tcp --permanent 
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --reload

然后在其他的2个服务器重重复上面的步骤,(注意节点设置)

3.2、kafka 集群搭建

修改server.properties

[root@kafkaserver1 kafka_2.12-3.5.1]# vim config/server.properties 
----
#每个节点唯一
broker.id=1
#填写每个节点的本机IP
listeners=PLAINTEXT://10.0.0.7:9092
#这里有几个成员就写几个
zookeeper.connect=10.0.0.7:2181,192.10.0.0.8:2181,10.0.0.9:2181
#我这里有三台机器所以设置下面的属性为3
offsets.topic.replication.factor=3 
transaction.state.log.replication.factor=3 
transaction.state.log.min.isr=3
----
group.initial.rebalance.delay.ms=3

image-20231110153904387

启动kafka如果报错,则对meta.properties进行修改,该文件默认在 /tmp/kafka-logs

删除cluster.id,broker.id等于server.properties里的即可 //记得备份一份!!!!

[root@kafkaserver1 kafka-logs]# vim meta.properties 
#
#Fri Nov 10 14:56:24 CST 2023
version=0
broker.id=1

#先停止服务后在修改文件!!!

其它两台节点,重复如上操作即可,注意broker.id唯一性

3.3、kafka 集群测试

搭建完成后,我们简单的测试一下是否成功。
我们在节点 10.0.0.7上启动一个发布者进行消息发布,在节点10.0.0.8和10.0.0.9上分别启动一个消费者进行事件的消费

创建测试用主题

bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.9:9092 --offset latest  --topic topic-test

创建消费者

bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.7:9092 --offset latest  --topic topic-test

创建发布者

bin/kafka-console-producer.sh  --topic topic-test --broker-list 10.0.0.7:9092,10.0.0.8:9092,10.0.0.9:9092

测试结果:

image-20231114101856986

image-20231114101923771

image-20231114101940883

四、kafka常用命令使用

4.1、kafka启停命令

1.前台启动

kafka 前台启动命令

bin/kafka-server-start.sh config/server.properties

2.后台启动

后台常驻方式,带上参数 -daemon,如:

bin/kafka-server-start.sh -daemon config/server.properties

或者

nohup bin/kafka-server-start.sh config/server.properties 

指定 JMX port 端口启动,指定 jmx,可以方便监控 Kafka 集群

JMX_PORT=9991 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

3.停止命令

bin/kafka-server-stop.sh

4.2、Topic (主题)相关命令

4.2.1、 创建 Topic

参数 --topic 指定 Topic 名,–partitions 指定分区数,–replication-factor 指定备份(副本)数

创建名为 test_kafka_topic 的 Topic

#旧版本
bin/kafka-topics.sh -zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test_kafka_topic
#新版本
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic test_kafka_topic
  • bin/kafka-topics.sh: 启动 Kafka 主题管理工具脚本。
  • -zookeeper localhost:2181: 指定连接到 ZooKeeper 的地址。在早期版本的 Kafka 中,ZooKeeper 被用于管理 Kafka 的元数据。在较新的 Kafka 版本中,Kafka 已经移除了对 ZooKeeper 的直接依赖,而是采用了更现代的元数据存储方式。因此,实际上,新版本的 Kafka 通常会使用 --bootstrap-server 参数指定 Kafka 服务器的地址,而不再使用 ZooKeeper。
  • --create: 表示创建主题的操作。
  • --partitions 5: 指定主题创建时的分区数为 5。分区用于并行处理和提高吞吐量。
  • --replication-factor 1: 指定每个分区的副本数量为 1。在生产环境中,通常会将该值设置为大于 1 的值,以提高容错性。
  • --topic test_kafka_topic: 指定要创建的主题的名称为 "test_kafka_topic"。

这个命令的效果是创建了一个名为 "test_kafka_topic" 的主题,该主题有 5 个分区,每个分区有一个副本,ZooKeeper 的地址是 localhost:2181。请注意,实际上,新版本的 Kafka 不再强制使用 ZooKeeper,而是更多地依赖于 --bootstrap-server 指定的 Kafka 服务器。

注意,如果配置文件 server.properties 指定了 Kafka 在 zookeeper 上的目录,则参数也要指定,否则会报无可用的 brokers(下面部分命令也有同样的情况),如:

#旧版本
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test
#新版本
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  • /usr/local/kafka/bin/kafka-topics.sh: 启动 Kafka 主题管理工具脚本的完整路径。
  • --create: 表示创建主题的操作。
  • --zookeeper localhost:2181/kafka: 指定连接到 ZooKeeper 的地址。这里的地址是 localhost:2181,而 /kafka 是 ZooKeeper 中存储 Kafka 数据的路径。在较新的 Kafka 版本中,ZooKeeper 已经逐渐被替代,而实际生产环境中更常使用 --bootstrap-server 参数来指定 Kafka 服务器的地址。
  • --replication-factor 1: 指定每个分区的副本数量为 1。在生产环境中,通常会将该值设置为大于 1 的值,以提高容错性。
  • --partitions 1: 指定主题创建时的分区数为 1。分区用于并行处理和提高吞吐量。
  • --topic test: 指定要创建的主题的名称为 "test"。

这个命令的效果是创建了一个名为 "test" 的主题,该主题有 1 个分区,每个分区有 1 个副本。请注意,实际上,新版本的 Kafka 不再强制使用 ZooKeeper,而是更多地依赖于 --bootstrap-server 指定的 Kafka 服务器。

4.2.2.、查询 Topic 列表

列出所有 Topic

#旧版本
bin/kafka-topics.sh --list --zookeeper localhost:2181
#新版本
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

4.2.3、 查询 Topic 详情

查询 Topic 的详细信息

#旧版本
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_kafka_topic
#新版本
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test_kafka_topic

说明:如果未指定 topic 则输出所有 topic 的信息

4.2.4、 增加 Topic 的 partition 数

#旧版本
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_kafka_topic --partitions 5 
#新版本
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic test_kafka_topic --partitions 5

4.2.5.、查看 topic 指定分区 offset 的最大值或最小值

time 为 -1 时表示最大值,为 -2 时表示最小值:

#旧版本
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test_kafka_topic --time -1 --broker-list 127.0.0.1:9092 --partitions 0 
#新版本
bin/kafka-run-class.sh kafka.admin.GetOffsetsShell --topic test_kafka_topic --time -1 --broker-list localhost:9092 --partitions 0

4.2.6.、删除Topic

删除名为 test_kafka_topic 的 Topic

#旧版本
bin/kafka-topics.sh --delete --zookeeper localhost:2181  --topic test_kafka_topic
#新版本
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test_kafka_topic

说明:在${KAFKA_HOME}/config/server.properties中配置 delete.topic.enable 为 true,这样才能生效,删除指定的 topic主题

4.3、消息 相关命令

4.3.1、 发送消息生产者发送消息

生产者发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_kafka_topic

4.3.2.、消费消息(从头开始)

消费者查询消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --from-beginning --topic test_kafka_topic

4.3.3.、消费消息(从尾开始)

从尾部开始取数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest

4.3.4、 消费消息(从尾开始指定分区)

从尾部开始取数据,指定分区消费:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest --partition 0

4.3.5.、消费消息(指定分区指定偏移量)

–partition :指定要消费的分区号
–offset :指定起始偏移量消费

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic  --partition 0 --offset 100 

4.3.6、指定分组->消费消息

消费者消费消息(指定分组)
注意给客户端命名之后,如果之前有过消费,那么–from-beginning就不会再从头消费了

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --from-beginning --topic test_kafka_topic --group t1

说明:
–from-beginning:表示从头开始接收数据
–group:指定消费者组

4.3.7、取指定个数

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest --partition 0 --max-messages 1

4.4、消费者 Group

4.4.1、指定 Group

指定1分组从头开始消费消息(应该会指定偏移量)

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning

4.4.2、查看消费者 Group 列表

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

4.4.3、 查看 Group 详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe

4.4.4、删除 Group 中 Topic

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete

4.4.5、删除 Group

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete

4.5、补充命令

4.5.1、平衡 leader

bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

4.5.2、自带压测工具

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092 
0