Kafka学习

管理员

Kafka学习

一、kafka所需的命令

启动kafka要先启动zookeeper,zookeeper学习可以参考bilibili的尚硅谷的教程:07_尚硅谷_zk_本地_安装_哔哩哔哩_bilibili

启动kafka需要执行命令,进入到/opt/module/zookeeper-3.5.7路径下,执行

bin/zkServer.sh start

将start改为status可以查看zookeeper的状态。

kafka启动命令:进入到kafka目录中,/opt/module/kafka。使用命令起到kafka:

bin/kafka-server-start.sh -daemon config/server.properties

停止kafka命令

bin/kafka-server-stop.sh

二、kafka的connect尝试

同步文件

尝试使用connect自带的文件source和sink测试文件的同步更新。

  1. standalone模式启动connect:

​ 其中的一些配置文件说明如下,首先是config目录下的connect-standalone.properties配置文件:

#设置需要连接到的Kafka节点
bootstrap.servers=localhost:9092
# key和value的转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 是否也转换schema,如果设置为false的话,就只会转换payload
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# offset保存于的文件
offset.storage.file.filename=/tmp/connect.offsets
# 自动刷新Offset的时间,单位时毫秒
offset.flush.interval.ms=10000

​ 因为我们要测试文件的同步,所以我们先编辑source的配置文件connect-file-source.properties,内容如下:

# Source的名字
name=local-file-source
# Source对应的类型
connector.class=FileStreamSource
# Task的数量,因为我们是standalone模式,所以只能是1个。在分布式部署时,可以设置为多个
tasks.max=1
# 需要读取的文件
file=/root/access.log
# 读取消息后存入的主题,这个主题最好提前创建出来,可以提前规划好分区和副本因子
topic=connect-test

​ 对应的sink的配置文件是connect-file-sink.properties

# Sink的名字
name=local-file-sink
# Sink的类型,因为我们是文件的connect,所以这个类不能动
connector.class=FileStreamSink
# Task的数量
tasks.max=1
# sink输出的文件
file=/root/access2.log.sink
# 消费的主题,必须与source的主题一致
topics=connect-test

connect-test主题的创建,如果我们不自己创建的话,connect是会自动创建的,我们就默认由connect自动创建。接着启动connect,使用如下命令进行启动:

bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

​ 对应的三个配置文件分别就是我们刚刚介绍的三个properties文件,运行命令后,查看jps命令,可以看到多了一个ConnectStandalone进程出来。

​ 这就是一个Worker了。下面我测试是否可以进行同步,多开一个ssh选项卡,使用tail -f /root/access2.log.sink 命令追踪该文件内容,可以发现,/root/access2.log.sink文件自动创建出来了,但是source的文件/root/access.log并没有自动创建,下面我们创建该文件touch /root/access.log,并向其中追加内容。通过echo命令新增内容。

​ 如果我修改,将“123”改成“321”的话:

access2.log.sink的内容不会修改。所以该方式是适合日志同步的,因为日志是追加编辑的,并且一般来说不会修改已经写入的内容。

同步数据库