02.Kafka快读入门

安装部署

先下载zookeeper、kafka

zookeeper下载地址:Download
kafka下载地址:Download

集群规划

hadoop001 hadoop002
zk zk
kafka kafka

集群部署

1) 解压安装包

[root@hadoop001 software]# pwd/root/software[root@hadoop001 software]# tar -xvf kafka_2.12-2.5.0.tgz 

2) 修改解压后的文件名称并移动到指定目录

[root@hadoop001 software]# mv kafka_2.12-2.5.0 kafka_2.12[root@hadoop001 software]# mv kafka_2.12 /usr/local/

3) 在/opt/kafka目录下创建logs文件夹

[root@hadoop001 opt]# mkdir -p /opt/kafka/logs

4) 修改配置文件

#broker的全局唯一编号,不能重复broker.id=0#删除topic功能使能delete.topic.enable=true#处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的现成数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径log.dirs=/opt/kafka/logs#topic在当前broker上的分区个数num.partitions=1#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除log.retention.hours=168#配置连接Zookeeper集群地址zookeeper.connect=hadoop001:2181,hadoop002:2181

5) 配置环境变量

[root@hadoop001 kafka_2.12]# vim /etc/profileexport KAFKA_HOME=/usr/local/kafka_2.12export PATH=$PATH:$KAFKA_HOME/bin# 重新加载配置文件[root@hadoop001 kafka_2.12]# source /etc/profile

6) 分发安装包
将hadoop001的kafka安装包发送到hadoop002机器上

# 首先将hadoop002对应的ip,配知道hadoop001的机器上[root@hadoop001 local]# vim /etc/hosts192.168.78.3 hadoop001192.168.78.3 localhost192.168.78.4 hadoop002然后使用scp命令scp -r  /usr/local/kafka_2.12/ root@hadoop002:/usr/local/

分发之后记得配置其他机器的环境变量

7) 修改hadoop002的配置

到hadoop002机器上,修改broker.id=1
注:broker.id不得重复

切换到hadoop002机器上

# 增加hadoop001的IP映射[root@192 config]# vim /etc/hosts192.168.78.4 hadoop002192.168.78.4 localhost192.168.78.3 hadoop001  

8) 启动集群

依次在hadoop001、hadoop001 节点上启动kafka(保证每台机器上zookeeper是启动的)

# hadoop001机器[root@hadoop001 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties # hadoop002机器[root@hadoop002 kafka_2.12]# bin/kafka-server-start.sh -daemon config/server.properties 

9) 关闭集群

[root@hadoop001 kafka_2.12]# bin/kafka-server-stop.sh[root@hadoop002 kafka_2.12]# bin/kafka-server-stop.sh 

Kafka命令行操作

1) 查看当前服务器中的所有topic

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --list__consumer_offsetsyiyang

2) 创建topic

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --create --replication-factor 2 --partitions 1  --topic first 

--topic 定义topic名
--replication-factor 定义副本的数(不能超过集群的数量)
--partition 定义分区数

3) 删除topic

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh --zookeeper hadoop001:2181 --delete --topic firstTopic first is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

需要server.properties中设置delete.topic.enable=true否则只是标记删除。

4) 发送消息

[root@hadoop001 kafka_2.12]# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic yiyang>hello kafka>  

5) 消费消息

[root@hadoop001 kafka_2.12]# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic yiyanghello kafka

--from-beginning:会把主题中以往所有的数据都读取出来。

6) 查看某个Topic的详情

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyangTopic: yiyang   PartitionCount: 1       ReplicationFactor: 1    Configs:        Topic: yiyang   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

7) 修改分区数

[root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --alter --topic yiyang --partitions 6WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded![root@hadoop001 kafka_2.12]# bin/kafka-topics.sh -zookeeper hadoop001:2181 --describe --topic yiyangTopic: yiyang   PartitionCount: 6       ReplicationFactor: 1    Configs:        Topic: yiyang   Partition: 0    Leader: 0       Replicas: 0     Isr: 0        Topic: yiyang   Partition: 1    Leader: 1       Replicas: 1     Isr: 1        Topic: yiyang   Partition: 2    Leader: 0       Replicas: 0     Isr: 0        Topic: yiyang   Partition: 3    Leader: 1       Replicas: 1     Isr: 1        Topic: yiyang   Partition: 4    Leader: 0       Replicas: 0     Isr: 0        Topic: yiyang   Partition: 5    Leader: 1       Replicas: 1     Isr: 1[root@hadoop001 kafka_2.12]#  

来源:https://www.icode9.com/content-4-865351.html

(0)

相关推荐