跳至主要內容

Klustron CDC将数据变更写入kafka

Klustron大约 5 分钟

Klustron CDC将数据变更写入kafka

注意:

如无特别说明,文中的版本号可以使用任何已发布版本的版本号代替。所有已发布版本详见:http://doc.klustron.com/zh/Release_notes.html。

本文内容:

Klustron数据库存储节点兼容MySQL binlog dump协议,Klustron从1.2.1开始提供适用于数据同步需求的CDC模块,Klustron CDC已支持多种模式,比如从Klustron集群导出数据(SQL或文件),或支持从开源MySQL导出数据到Klustron集群,目前也默认支持从Klustron将变更数据导出为kafka消息,写出到kafka主题,有其他数据使用需求的下游服务,可从kafka接收消息,完成相关的需求,在本文中,我们将实践CDC数据写出到kafka的配置过程。

01 环境准备

本文所指的测试场景中,涉及使用CDC kafka插件进行Klustron与kafka之间的测试,所以,需要准备一套Klustron的运行环境,理论上,需要至少4台服务器,3台用于Klustron环境,1台用于kafka(单机)服务,受限于资源有限,故使用3台机器搭建整个测试环境,因为使用了不同的服务端口,所以不存在环境冲突。

1.1 Klustron 安装配置

《略》

Klustron 环境说明:

  • XPanel: http://192.168.0.152:40180/KunlunXPanel/#/cluster
  • 计算节点:192.168.0.152 ,端口: 47001
  • 存储节点(shard1):192.168.0.155, 端口:57003 (主)
  • Klustron 安装在kl用户下

1.2 Kafka安装配置

《略》

Kafka 环境说明:

  • broker0服务IP及端口:192.168.0.153:9092
  • topic: CDC kafka会自动创建一个主题,名为:cdc_kafka_<集群名称>

1.3 CDC安装配置

从[http://zettatech.tpddns.cn:14000/dailybuilds/enterprise/kunlun-cdc-1.4.1.tgz 下载文件kunlun-cdc-1.4.1.tgz](http://zettatech.tpddns.cn:14000/dailybuilds/enterprise/kunlun-cdc-1.4.1.tgz 下载文件kunlun-cdc-1.4.1.tgz) 释放到192.168.0.152 / 153/ 155 3台机的kl用户的home目录下,直接用tar -zxvf kunlun-cdc-1.4.1.tgz 解开该文件,在此目录下,会生成名为:/home/kl/kunlun-cdc-1.4.1,及相关的子目录,形式如下:

用kl用户进入192.168.0.152 / 192.168.0.153/ 192.168.0.155 的conf目录,编辑参数文件:kunlun_cdc.cnf

cd /home/kl/kunlun-cdc-1.4.1/conf
vi kunlun_cdc.cnf

修改如下参数:

local_ip = 192.168.0.153  #3台机各自的IP

http_port = 18012     #3台机取一致的端口,实际可以不一致,按需定义

ha_group_member = 192.168.0.152:18081,192.168.0.153:18081,192.168.0.155:18081  #3台机组成一个CDC的高可用集群,端口自定义,本例取18081

server_id = 2  #3台机各自定义一个自已的ID号,不唯一即可

保存退出。

进入3台机的CDC bin目录,该目录下有用于启停CDC服务的命令:

start_kunlun_cdc.sh stop_kunlun_cdc.sh

在3台机执行如下指令,启动CDC服务

cd /home/kl/kunlun-cdc-1.4.1/bin

./start_kunlun_cdc.sh

ps -ef |grep cdc 

输出如下信息,说明启动CDC服务成功

登陆到XPanel,http:// 192.168.0.152:40180/KunlunXPanel/#/cdc/list,将打开如下页面:

点击“新增”配置“CDC服务”,输入如下相应的参数,

分组号可以自定义,本例中取分组号为1,因为在本例中是配置了3台CDC服务器组成高可用集群,所以,需要点击上图中右侧的“+”号继续完成剩余两个CDC服务的配置信息添加工作,并完成后点击“确认”进行保存。

保存后,界面效果如下所示:

注意一下,其中192.168.0.152在“主节点”字段显示为1,意味着当前承担CDC服务的节点是它,其他两个节点作为备节点,在主节点失效的时候,会选择其中一个升级为主节点,继续服务CDC相关的任务。

02 CDC kafka 插件测试

2.1 从Klustron向kafka写出数据

连接Klustron计算节点,建立源库相应用户及表,按如下指令建立相关对象:

kl@kunlun1:~$ psql -h 192.168.0.152 -p 47001 postgres
create user test with password 'test';
grant create on database postgres to test ;
exit

kl@kunlun1:~$ psql -h 192.168.0.152 -p 47001 postgres
create schema test ;
create table test. kl2kafka (f1 int primary key, f2 varchar(10));

打开XPanel:http:// 192.168.0.152:40180/KunlunXPanel/#/cdc/worker ,为Klustron添加CDC任务:

点击“+新增”,输入如下参数:

说明:

  1. 任务名称:需要查看Klustron集群cluster1的名称,非随意填入;
  2. 元数据库:Klustron元数据库是一个3节点的MGR,需在XPanel中查看Klustron集群环境中MGR的主节点当前是哪一个,在此处填写主节点信息;
  3. 数据表:数据表的格式,需要遵循:<database名>$$<schema名>.<表名>的格式,有多个表的话,中间用“,”隔开。

然后,点击 “shard参数:“右侧的“添加+”,弹出如下对话框:

这里,需要连接到存储节点的MySQL分片,获取分片的binlog相关的信息,按如下指令操作:

分片信息:

分片1:

kl@kunlun3:~$ mysql -h 192.168.0.155 -P 57003 -upgx -ppgx_pwd

mysql> show master status ;

输出信息如下所示:

将 上图中file, position, executed_gtid_set 等信息填写至前述shard 配置信息对话框中,如下所示:

点击“确认”完成分片一的信息配置,显示如下:

点击“输出参数”旁的“添加+”,如下:

在 output plugins : 这里选 : event_kafka

brokers : 输入 : 192.168.0.153:9092

剩余参数保持默认即可。

点击“确认”,显示如下:

确认无误后,点击“确认”,完成CDC任务上报,显示如下:

如果同步任务的状态正常,则我们可以接下来进行测试,连接到Klustron集群的计算实例,对kl2kafka表插入记录:

kl@kunlun1:~$ psql -h 192.168.0.152 -p 47001 postgres
postgres=> insert into test.kl2kafka values (2,'bbb'); 
postgres=> insert into test.kl2kafka values (3,'ccc'); 
postgres=> delete from test.kl2kafka where f1=1 ;

再连接kafka (192.168.0.153),检查主题上的消息接收情况:

kl@kunlun2:~$ /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

输出topic 列表,其中有一个topic名称为:cdc_kafka_cluster_1716720242_000002 如下所示:

用kafka 自带的消费者客户端连接该topic,读取该topic上的消息,输出消息内容,如下所示:

kl@kunlun2:~$ /usr/local/kafka/bin/kafka-console-consumer.sh --topic cluster_1716720242_000002 --bootstrap-server localhost:9092 --from-beginning

可以看到,kafka的主题上已有之前在Klustron中插入的记录的消息和删除记录的消息,说明CDC kafka插件分发消息到kafka目标topic的任务执行正常。

END