Kafka集群搭建

zookeeper集群搭建

kafka集群是把状态保存在zookeeper中的,首先要搭建zookeeper集群。

1、安装jdk
1
2
wget http://anchnet-script.oss-cn-shanghai.aliyuncs.com/oracle/jdk-8u171-linux-x64.rpm
yum localinstall jdk-8u171-linux-x64.rpm -y
2、下载kafka安装包
1
2
wget http://anchnet-script.oss-cn-shanghai.aliyuncs.com/kafka/kafka_2.12-1.1.0.tgz
官网下载链接:http://kafka.apache.org/downloads

解压kafka

tar -zxvf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 kafka

3、配置zk集群
修改zookeeper.properties文件

直接使用kafka自带的zookeeper建立zk集群

1
2
cd /data/kafka
vim conf/zookeeper.properties

mark

1
2
3
4
5
6
7
8
9
10
11
12
#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:需手动创建
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
创建myid文件

进入dataDir目录,将三台服务器上的myid文件分别写入1、2、3。
myid是zk集群用来发现彼此的标识,必须创建,且不能相同。

echo “1” > /data/kafka/zk/myid
echo “2” > /data/kafka/zk/myid
echo “3” > /data/kafka/zk/myid

注意项

zookeeper不会主动的清除旧的快照和日志文件,需要定期清理。

1
2
3
4
5
6
7
8
9
10
11
#!/bin/bash 
#snapshot file dir
dataDir=/data/kafka/zk/version-2
#tran log dir
dataLogDir=/data/kafka/log/zk/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
4、启动zk服务

进入kafka目录,执行zookeeper命令

1
2
cd /data/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &

没有报错,而且jps查看有zk进程就说明启动成功了。
mark

Kafka集群搭建

1、修改server.properties配置文件

vim conf/server.properties

mark

部分参数含义:

1
2
3
4
5
先每台设置host,listeners里要设置,否则后面消费消息会报错。 
broker.id 每台都不能相同
num.network.threads 设置为cpu核数
num.partitions 分区数设置视情况而定,上面有讲分区数设置
default.replication.factor kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
2、启动kafka集群
1
nohup ./bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &

执行jps检查

mark

3、创建topic验证
1
2
3
4
./bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 2 --partitions 1 --topic test1
--replication-factor 2 #复制两份
--partitions 1 #创建1个分区
--topic #主题为test1
4、创建生产者和消费者
1
2
3
4
5
#模拟客户端去发送消息,生产者
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test1
#模拟客户端去接受消息,消费者
./bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --from-beginning --topic test1
#然后在生产者处输入任意内容,在消费端查看内容。
5、其他命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
./bin/kafka-topics.sh --list --zookeeper xxxx:2181
#显示创建的所有topic
./bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic test1
#Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:
# Topic: test1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
#分区为为1 复制因子为2 他的 test1的分区为0
#Replicas: 0,1 复制的为0,1
```
###### 6、删除topic
> 修改配置文件server.properties添加如下配置:
> delete.topic.enable=true
> 配置完重启kafka、zookeeper。

如果不想修改配置文件可删除topc及相关数据目录

#删除kafka topic
./bin/kafka-topics.sh –delete –zookeeper xxxx:2181,xxxx:2181 –topic test1

#删除kafka相关数据目录
rm -rf /data/kafka/log/kafka/test*

#删除zookeeper相关路径
rm -rf /data/kafka/zk/test*
rm -rf /data/kafka/log/zk/test*
```

-------------本文结束感谢您的阅读-------------
原创技术分享,感谢您的支持。