02.Kafka快读入门
安装部署
先下载zookeeper、kafka
zookeeper下载地址:Download
kafka下载地址:Download
集群规划
| hadoop001 | hadoop002 |
|---|---|
| zk | zk |
| kafka | kafka |
集群部署
1) 解压安装包
[root@hadoop001 software]# pwd/root/software[root@hadoop001 software]# tar -xvf kafka_2.12-2.5.0.tgz
2) 修改解压后的文件名称并移动到指定目录
[root@hadoop001 software]# mv kafka_2.12-2.5.0 kafka_2.12[root@hadoop001 software]# mv kafka_2.12 /usr/local/
3) 在/opt/kafka目录下创建logs文件夹
[root@hadoop001 opt]# mkdir -p /opt/kafka/logs
4) 修改配置文件
#broker的全局唯一编号,不能重复broker.id=0#删除topic功能使能delete.topic.enable=true#处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的现成数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径log.dirs=/opt/kafka/logs#topic在当前broker上的分区个数num.partitions=1#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除log.retention.hours=168#配置连接Zookeeper集群地址zookeeper.connect=hadoop001:2181,hadoop002:2181
5) 配置环境变量
[root@hadoop001 kafka_2.12]# vim /etc/profileexport KAFKA_HOME=/usr/local/kafka_2.12export PATH=$PATH:$KAFKA_HOME/bin# 重新加载配置文件[root@hadoop001 kafka_2.12]# source /etc/profile
6) 分发安装包
将hadoop001的kafka安装包发送到hadoop002机器上
# 首先将hadoop002对应的ip,配知道hadoop001的机器上[root@hadoop001 local]# vim /etc/hosts192.168.78.3 hadoop001192.168.78.3 localhost192.168.78.4 hadoop002然后使用scp命令scp -r /usr/local/kafka_2.12/ root@hadoop002:/usr/local/
分发之后记得配置其他机器的环境变量
7) 修改hadoop002的配置
到hadoop002机器上,修改broker.id=1
注:broker.id不得重复
切换到hadoop002机器上
# 增加hadoop001的IP映射[root@192 config]# vim /etc/hosts192.168.78.4 hadoop002192.168.78.4 localhost192.168.78.3 hadoop001
8) 启动集群
依次在hadoop001、hadoop001 节点上启动kafka(保证每台机器上zookeeper是启动的)
# hadoop001机器[root@hadoop001 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties # hadoop002机器[root@hadoop002 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties
9) 关闭集群
[root@hadoop001 kafka_2.12]# bin/kafka-server-stop.sh[root@hadoop002 kafka_2.12]# bin/kafka-server-stop.sh
Kafka命令行操作
1) 查看当前服务器中的所有topic
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --list__consumer_offsetsyiyang
2) 创建topic
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --create --replication-factor 2 --partitions 1 --topic first
--topic 定义topic名
--replication-factor 定义副本的数(不能超过集群的数量)
--partition 定义分区数
3) 删除topic
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic firstTopic first is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
4) 发送消息
[root@hadoop001 kafka_2.12]# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic yiyang>hello kafka>
5) 消费消息
[root@hadoop001 kafka_2.12]# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic yiyanghello kafka
--from-beginning:会把主题中以往所有的数据都读取出来。
6) 查看某个Topic的详情
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyangTopic: yiyang PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: yiyang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
7) 修改分区数
[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --alter --topic yiyang --partitions 6WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded![root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyangTopic: yiyang PartitionCount: 6 ReplicationFactor: 1 Configs: Topic: yiyang Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: yiyang Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: yiyang Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: yiyang Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Topic: yiyang Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: yiyang Partition: 5 Leader: 1 Replicas: 1 Isr: 1[root@hadoop001 kafka_2.12]#
赞 (0)
