通过OGG实现mysql到kafka实时数据采集

Posted 字如其名

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过OGG实现mysql到kafka实时数据采集相关的知识,希望对你有一定的参考价值。

通过OGG实现mysql到kafka实时数据采集,准备软件:

centos 6.7虚拟机两台

源端 192.168.5.201 安装mysql OGG客户端

mysql-8.0.23-1.el7.x86_64.rpm-bundle.tar 

191003_ggs_Linux_x64_MySQL_64bit.zip


目标端 192.168.5.202 安装kafka OOG客户端

jdk-8u211-linux-x64.tar.gz

kafka_2.13-2.7.0.tgz

zookeeper-3.4.14.tar.gz

OGG_BigData_Linux_x64_19.1.0.0.5.zip


一 源端配置

1 mysql安装mysql依赖,并解压安装

[root@localhost mysql]# yum -y install autoconf perl.x86_64 perl-devel.x86_64 perl-JSON.noarch net-tools numactl libaio* openssl-devel perl-Test*[root@localhost mysql]# [root@localhost mysql]# lsmysql-8.0.23-1.el7.x86_64.rpm-bundle.tar mysql-community-embedded-compat-8.0.23-1.el7.x86_64.rpmmysql-community-client-8.0.23-1.el7.x86_64.rpm mysql-community-libs-8.0.23-1.el7.x86_64.rpmmysql-community-client-plugins-8.0.23-1.el7.x86_64.rpm mysql-community-libs-compat-8.0.23-1.el7.x86_64.rpmmysql-community-common-8.0.23-1.el7.x86_64.rpm mysql-community-server-8.0.23-1.el7.x86_64.rpmmysql-community-devel-8.0.23-1.el7.x86_64.rpm mysql-community-test-8.0.23-1.el7.x86_64.rpm[root@localhost mysql]# [root@localhost mysql]# [root@localhost mysql]# [root@localhost mysql]# rpm -ivh *.rpmwarning: mysql-community-client-8.0.23-1.el7.x86_64.rpm: Header V3 DSA/SHA1 Signature, key ID 5072e1f5: NOKEYPreparing... ################################# [100%]Updating / installing... 1:mysql-community-common-8.0.23-1.e################################# [ 11%] 2:mysql-community-client-plugins-8.################################# [ 22%] 3:mysql-community-libs-8.0.23-1.el7################################# [ 33%] 4:mysql-community-client-8.0.23-1.e################################# [ 44%] 5:mysql-community-server-8.0.23-1.e################################# [ 56%] 6:mysql-community-test-8.0.23-1.el7################################# [ 67%] 7:mysql-community-devel-8.0.23-1.el################################# [ 78%] 8:mysql-community-libs-compat-8.0.2################################# [ 89%] 9:mysql-community-embedded-compat-8################################# [100%][root@localhost mysql]#

2 登录并修改密码

[root@localhost mysql]# systemctl start mysqld[root@localhost mysql]# grep password /var/log/mysqld.log 2021-04-03T20:51:17.333794Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: vy43:UR8T22b[root@localhost mysql]# mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'Carl@123123' PASSWORD EXPIRE NEVER;Query OK, 0 rows affected (0.01 sec)
mysql> ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'Carl@123123'; Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;Query OK, 0 rows affected (0.01 sec)

3 创建测试用表

mysql> mysql> create database test;Query OK, 1 row affected (0.01 sec)
mysql> use test;Database changedmysql> CREATE TABLE test (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(60) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;Query OK, 0 rows affected, 2 warnings (0.03 sec)
mysql> mysql> desc test;+-------+-------------+------+-----+---------+----------------+| Field | Type | Null | Key | Default | Extra |+-------+-------------+------+-----+---------+----------------+| id | int | NO | PRI | NULL | auto_increment || name | varchar(60) | YES | | NULL | |+-------+-------------+------+-----+---------+----------------+2 rows in set (0.00 sec)

4 查看二进制日志是否开启及相关配置文件,默认开启

mysql> mysql> show variables like '%log_bin%';+---------------------------------+-----------------------------+| Variable_name | Value |+---------------------------------+-----------------------------+| log_bin | ON || log_bin_basename | /var/lib/mysql/binlog || log_bin_index | /var/lib/mysql/binlog.index || log_bin_trust_function_creators | OFF || log_bin_use_v1_row_events | OFF || sql_log_bin | ON |+---------------------------------+-----------------------------+6 rows in set (0.00 sec)
mysql> mysql> show variables like '%sock%';+-----------------------------------------+-----------------------------+| Variable_name | Value |+-----------------------------------------+-----------------------------+| mysqlx_socket | /var/run/mysqld/mysqlx.sock || performance_schema_max_socket_classes | 10 || performance_schema_max_socket_instances | -1 || socket | /var/lib/mysql/mysql.sock |+-----------------------------------------+-----------------------------+4 rows in set (0.00 sec)


5 解压OGG软件,配置相关进程

[root@localhost ~]# hostnamectl set-hostname mysql[root@localhost ~]# bash[root@mysql ~]# [root@mysql ~]# mkdir -p /opt/ogg[root@mysql ~]# cd /opt/ogg/[root@mysql ogg]# mv /root/191003_ggs_Linux_x64_MySQL_64bit.zip ./[root@mysql ogg]# [root@mysql ogg]# [root@mysql ogg]# [root@mysql ogg]# unzip 191003_ggs_Linux_x64_MySQL_64bit.zip Archive: 191003_ggs_Linux_x64_MySQL_64bit.zip inflating: ggs_Linux_x64_MySQL_64bit.tar  inflating: OGG-19.1.0.0-README.txt  inflating: OGG_WinUnix_Rel_Notes_19.1.0.0.3.pdf [root@mysql ogg]# tar -xvf ggs_Linux_x64_MySQL_64bit.tar......

6 创建目录

[root@mysql ogg]# ./ggsciOracle GoldenGate Command Interpreter for MySQLVersion 19.1.0.0.3 OGGCORE_19.1.0.0.0_PLATFORMS_190907.0144Linux, x64, 64bit (optimized), MySQL Enterprise on Sep 7 2019 08:41:32Operating system character set identified as UTF-8.
Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.
GGSCI (mysql) 1create subdirs
Creating subdirectories under current directory /opt/ogg
Parameter file /opt/ogg/dirprm: created.Report file /opt/ogg/dirrpt: created.Checkpoint file /opt/ogg/dirchk: created.Process status files /opt/ogg/dirpcs: created.SQL script files /opt/ogg/dirsql: created.Database definitions files /opt/ogg/dirdef: created.Extract data files /opt/ogg/dirdat: created.Temporary files /opt/ogg/dirtmp: created.Credential store files /opt/ogg/dircrd: created.Masterkey wallet files /opt/ogg/dirwlt: created.Dump files /opt/ogg/dirdmp: created.


7 添加抽取的表

GGSCI (mysql) 2> dblogin sourcedb test@192.168.5.201:3306,userid root,password Carl@123123Successfully logged into database.
GGSCI (mysql DBLOGIN as root) 3> add trandata test.testERROR: Invalid command.(此处有报错,不影响结果)

8 配置管理进程

GGSCI (mysql DBLOGIN as root) 4> edit param mgr
PORT 7809DYNAMICPORTLIST 7810-7909AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 10


9 配置抽取进程

GGSCI (mysql DBLOGIN as root) 5> edit param ext_1
EXTRACT ext_1
SETENV(MYSQL_UNIX_TCP="/var/lib/mysql/mysql.sock")sourcedb test@192.168.5.201:3306, userid root,password Carl@123123tranlogoptions altlogdest "/var/lib/mysql/binlog.index"REPORTCOUNT EVERY 10 SECONDS, RATEEXTTRAIL ./dirdat/01TABLE test.test;
GGSCI (mysql DBLOGIN as root) 6> ADD EXTRACT ext_1, TRANLOG, BEGIN NOWEXTRACT added.
GGSCI (mysql DBLOGIN as root) 7> ADD EXTTRAIL ./dirdat/01, EXTRACT ext_1, MEGABYTES 200EXTTRAIL added.



10 配置投递进程

GGSCI (mysql DBLOGIN as root) 8> edit param dpe_1
EXTRACT dpe_1PASSTHRUDYNAMICRESOLUTIONRMTHOST 192.168.5.202, MGRPORT 7809RMTTRAIL ./dirdat/ca/01TABLE test.*;
GGSCI (mysql DBLOGIN as root) 9> ADD EXTRACT dpe_1, EXTTRAILSOURCE ./dirdat/01EXTRACT added.
GGSCI (mysql DBLOGIN as root) 10> ADD RMTTRAIL ./dirdat/ca/01, EXTRACT dpe_1, MEGABYTES 200RMTTRAIL added.
GGSCI (mysql DBLOGIN as root) 11> [root@mysql ogg]# [root@mysql ogg]#



二 目标端配置

1 解压安装kafka软件

[root@localhost ~]# mkdir -p /opt/bdata[root@localhost ~]# mdkir -p /opt/ogg[root@localhost ~]# mkdir -p /opt/ogg[root@localhost ~]# tar -zxvf jdk-8u211-linux-x64.tar.gz -C /opt/bdata[root@localhost ~]# tar -zxvf kafka_2.13-2.7.0.tgz -C /opt/bdata[root@localhost ~]# tar -zxvf zookeeper-3.4.14.tar.gz -C /opt/bdata[root@localhost ~]# cp OGG_BigData_Linux_x64_19.1.0.0.5.zip /opt/ogg

2配置环境变量

[root@localhost ~]# vim /etc/profile#JAVA_HOMEexport JAVA_HOME=/opt/bdata/jdk1.8.0_211export PATH=$PATH:$JAVA_HOME/bin#KAFKA_HOMEexport KAFKA_HOME=/opt/bdata/kafka_2.13-2.7.0export PATH=$PATH:$KAFKA_HOME/bin#ZK_HOMEexport ZK_HOME=/opt/bdata/zookeeper-3.4.14export PATH=$PATH:$ZK_HOME/bin
export OGG_HOME=/opt/oggexport LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/libexport PATH=$OGG_HOME:$PATH[root@localhost ~]# source /etc/profile



3 ZK默认配置文件即可

[root@localhost ~]# mv /opt/bdata/zookeeper-3.4.14/conf/zoo_sample.cfg /opt/bdata/zookeeper-3.4.14/conf/zoo.cfg[root@localhost ~]# [root@localhost ~]# zkServer.sh startZooKeeper JMX enabled by defaultUsing config: /opt/bdata/zookeeper-3.4.14/bin/../conf/zoo.cfgStarting zookeeper ... STARTED[root@localhost ~]#


4 kafka需要更改配置文件

[root@localhost ~] vim /opt/bdata/kafka_2.13-2.7.0/config/server.properties

31行 listeners=PLAINTEXT://192.168.5.99:909236行 advertised.listeners=PLAINTEXT://192.168.5.99:9092123行 zookeeper.connect=192.168.5.99:2181[root@kafka ~]# kafka-server-start.sh /opt/bdata/kafka_2.13-2.7.0/config/server.properties &




5 解压OGG软件,配置相关进程

[root@localhost ~]# cd /opt/ogg[root@localhost ogg]# [root@localhost ogg]# lsOGG_BigData_Linux_x64_19.1.0.0.5.zip[root@localhost ogg]# unzip OGG_BigData_Linux_x64_19.1.0.0.5.zip Archive: OGG_BigData_Linux_x64_19.1.0.0.5.zip inflating: OGGBD-19.1.0.0-README.txt  inflating: OGG_BigData_19.1.0.0.5_Release_Notes.pdf  inflating: OGG_BigData_Linux_x64_19.1.0.0.5.tar [root@localhost ogg]# tar -xvf OGG_BigData_Linux_x64_19.1.0.0.5.tarAdapterExamples/...[root@localhost ogg]# rm -f OGG* *ggs_*[root@localhost ogg]# [root@localhost ogg]# [root@localhost ogg]# hostnamectl set-hostname kafka[root@localhost ogg]# bash

6 创建目录

[root@kafka ogg]# ./ggsciOracle GoldenGate for Big DataVersion 19.1.0.0.5 (Build 007)
Oracle GoldenGate Command InterpreterVersion 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58Operating system character set identified as UTF-8.
Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.
GGSCI (kafka) 1> create subdirs
Creating subdirectories under current directory /opt/ogg
Parameter file /opt/ogg/dirprm: created.Report file /opt/ogg/dirrpt: created.Checkpoint file /opt/ogg/dirchk: created.Process status files /opt/ogg/dirpcs: created.SQL script files /opt/ogg/dirsql: created.Database definitions files /opt/ogg/dirdef: created.Extract data files /opt/ogg/dirdat: created.Temporary files /opt/ogg/dirtmp: created.Credential store files /opt/ogg/dircrd: created.Masterkey wallet files /opt/ogg/dirwlt: created.Dump files /opt/ogg/dirdmp: created.

7 配置管理进程

GGSCI (kafka) 2> edit param mgr 
PORT 7809DYNAMICPORTLIST 7810-7909AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 10
GGSCI (kafka) 3> 

8 配置rep_1

GGSCI (kafka) 3> edit param rep_1REPLICAT rep_1TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.propsDDL INCLUDE ALLREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP test.*, TARGET test.*;
GGSCI (kafka) 4> add replicat rep_1, exttrail ./dirdat/ca/01REPLICAT added.


9 配置kafka.props

GGSCI (kafka) 5> 

[root@kafka dirprm]# vim kafka.propsgg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.format=jsongg.handler.kafkahandler.format.includePrimaryKeys=truegg.handler.kafkahandler.BlockingSend=falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode=opgg.handler.kafkahandler.topicMappingTemplate=testgg.classpath=dirprm/:/opt/bdata/kafka_2.13-2.7.0/libs/*:/opt/ogg/:/opt/ogg/lib/*gg.log=log4jgg.log.level=INFOgg.report.time=30sec[root@kafka dirprm]


10 配置 custom_kafka_producer.properties

[root@kafka dirprm]# vim custom_kafka_producer.propertiesbootstrap.servers=192.168.5.202:9092acks=1compression.type=gzipreconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializerbatch.size=102400linger.ms=10000[root@kafka dirprm]



三 启动服务


源端mgr ---> 目标端mgr ---> 源端ext_1---> 源端dpe_1---> 目标rep_1

start mgr ---> start mgr ---> start ext_1---> start dpe_1---> start rep_1


1 启动完成,源端

GGSCI (mysql) 30> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER RUNNING EXTRACT RUNNING DPE_1 00:00:00 00:00:06 EXTRACT RUNNING EXT_1 00:00:00 00:00:08 GGSCI (mysql) 31>


2 启动完成,目标端

GGSCI (kafka) 12> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER RUNNING REPLICAT RUNNING REP_1 00:00:00 00:00:06 GGSCI (kafka) 13>



3 mysql写入数据进行测试

mysql> use testReading table information for completion of table and column namesYou can turn off this feature to get a quicker startup with -A

Database changedmysql> insert into test values(1,'carl');Query OK, 1 row affected (0.26 sec)
mysql> insert into test values(2,'carl');Query OK, 1 row affected (0.13 sec)
mysql> insert into test values(3,'good job!');Query OK, 1 row affected (0.02 sec)mysql> 


4 查看topic及消费情况

[root@kafka ~]# kafka-topics.sh --zookeeper 192.168.5.202:2181 --listtest[root@kafka ~]# kafka-console-consumer.sh --bootstrap-server 192.168.5.202:9092 --topic test --from-beginning{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:37:00.280750","current_ts":"2021-04-04T06:37:08.134000","pos":"00000000000000001691","primary_keys":["ID"],"after":{"ID":1,"NAME":"carl"}}{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:38:49.280773","current_ts":"2021-04-04T06:38:56.395000","pos":"00000000000000001855","primary_keys":["ID"],"after":{"ID":2,"NAME":"carl"}}{"table":"test.test","op_type":"I","op_ts":"2021-04-04 06:39:04.281151","current_ts":"2021-04-04T06:39:10.410000","pos":"00000000000000002015","primary_keys":["ID"],"after":{"ID":3,"NAME":"good job!"}}


5 测试完成


以上是关于通过OGG实现mysql到kafka实时数据采集的主要内容,如果未能解决你的问题,请参考以下文章

oracle ogg是啥

oracle ogg是啥

oracle ogg ADG和DG的区别

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

OGG同步ORACLE数据到KAFKA