ORACLE的DDL日志 推送到Kafka,并接入Flink,进行统计

Posted 冷幽篁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ORACLE的DDL日志 推送到Kafka,并接入Flink,进行统计相关的知识,希望对你有一定的参考价值。

ORACLE的DDL日志 推送到Kafka,并用Flink进行实时统计

本次测试的环境:
环境:docker oracle12c 
日志模式:归档日志模式 archivelog
用户:scott/tiger 具有dba权限
大数据组件:kafka(默认创建好topic:flink_topic),zookeeper
额外组件:kafka-connect-oracle-1.0.jar
下载地址: https://github.com/erdemcer/kafka-connect-oracle

 

1. 创建测试表,并插入几条记录

 

 

 

2.开启归档日志模式

sqlplus / as sysdba
SQL> shutdown immediate
SQL> startup mount
SQL> alter database archivelog;
SQL> alter database open;
SQL> alter database add supplemental log data (all) columns;
SQL> conn username/password

3.准备相关Jar包

1. 从https://github.com/erdemcer/kafka-connect-oracle下载整个项目,把整个项目mvn clean package成kafa-connect-oracle-1.0.jar
2. 下载一个oracle的jdbc驱动jar—ojdbc7.jar
3. 将kafa-connect-oracle-1.0.jar and ojdbc7.jar放在kafka的安装包下的lib目录下
4. 将github项目里面的config/OracleSourceConnector.properties文件拷贝到kafak/config

4. 配置相关文件

# vi /opt/cloudera/parcels/KAFKA/lib/kafka/config/OracleSourceConnector.properties

 

 

# vi /opt/cloudera/parcels/KAFKA/lib/kafka/config/connect-standalone.properties

 

 

 5. 运行启动Connector

bin/connect-standalone.sh config/connect-standalone.properties config/OracleSourceConnector.properties

6. 启动consumer

bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --from-beginning --topic flink_topic

7. 最后结果

 

 

{"schema":{"type":"struct","fields":
[
    {"type":"int64","optional":false,"field":"SCN"},
    {"type":"string","optional":false,"field":"SEG_OWNER"},
    {"type":"string","optional":false,"field":"TABLE_NAME"},
    {"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},
    {"type":"string","optional":false,"field":"SQL_REDO"},
    {"type":"string","optional":false,"field":"OPERATION"},
    {"type":"struct","fields":
        [
            {"type":"double","optional":true,"field":"ID"},
            {"type":"string","optional":true,"field":"NAME"}
        ],"optional":true,"name":"value","field":"data"},
    {"type":"struct","fields":
        [
            {"type":"double","optional":true,"field":"ID"},
            {"type":"string","optional":true,"field":"NAME"}
        ],"optional":true,"name":"value","field":"before"}
],"optional":false,"name":"flink_test.scott.test_20200402.row"},
"payload":
{
    "SCN":1719627,
    "SEG_OWNER":"SCOTT",
    "TABLE_NAME":"TEST_20200402",
    "TIMESTAMP":1585773001000,
    "SQL_REDO":"insert into \\"SCOTT\\".\\"TEST_20200402\\"(\\"ID\\",\\"NAME\\") values (2,\'zhangfei\')",
    "OPERATION":"INSERT",
    "data":{"ID":2.0,"NAME":"zhangfei"},
    "before":null
}
}

8.Flink中读取数据 Demo

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment Env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.58.177:9092");
        properties.setProperty("zookeeper.connect", "192.168.58.171:2181,192.168.58.177:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer("flink_topic",new SimpleStringSchema(),properties);


        //设置并行度
        myConsumer.setStartFromEarliest();

        //添加数据源,json格式
        DataStreamSource<ObjectNode> stream = Env.addSource(myConsumer);

        stream.print();

        Env.execute("flink_topic");
    }


    public static class DataS{

        public Integer id;
        public String name;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

    }

 

 9. 运用Flink Sql进行实时计算

package com.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class OracleToFlink {
    public static void main(String[] args) throws Exception {

        //创建flink运行环境
        StreamExecutionEnvironment Env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建tableEnvironment
        StreamTableEnvironment TableEnv =  StreamTableEnvironment.create(Env);

        TableEnv.sqlUpdate("CREATE TABLE user_log2 (\\n" +
                "    payload ROW(SCN string,SEG_OWNER string,data ROW(ID DECIMAL,NAME string))\\n" +
                ") WITH (\\n" +
                "    \'connector.type\' = \'kafka\',\\n" +
                "    \'connector.version\' = \'universal\',\\n" +
                "    \'connector.topic\' = \'flink_topic\',\\n" +
                "    \'connector.startup-mode\' = \'latest-offset\',\\n" +
                "    \'connector.properties.group.id\' = \'test\',\\n" +
                "    \'connector.properties.zookeeper.connect\' = \'192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181\',\\n" +
                "    \'connector.properties.bootstrap.servers\' = \'192.168.58.177:9092\',\\n" +
                "    \'format.type\' = \'json\',\\n" +
                "    \'format.json-schema\' =\\n" +
                "    \'{\\n" +
                "        type : \\"object\\",\\n" +
                "        \\"properties\\":\\n" +
                "        {\\n" +
                "            \\"payload\\": {type : \\"object\\",\\n" +
                "                \\"properties\\": \\n" +
                "                 {\\n" +
                "                    \\"SCN\\" : {type :\\"string\\"},\\n" +
                "                    \\"SEG_OWNER\\" : {type :\\"string\\"},\\n" +
                "                    \\"data\\": {type : \\"object\\",\\n" +
                "                    \\"properties\\": {\\"ID\\": {type : \\"integer\\"},\\n" +
                "                                   \\"NAME\\": {type : \\"string\\"}\\n" +
                "                                  }\\n" +
                "                             }\\n" +
                "                 }}\\n" +
                "        }\\n" +
                "    }\'\\n" +
                ")"
        ) ;

        Table result=TableEnv.sqlQuery("select payload.data.NAME,sum(payload.data.ID) from user_log2 group by payload.data.NAME");

        TableEnv.toRetractStream(result,Types.TUPLE(Types.STRING,Types.STRING,Types.BIG_DEC,Types.STRING)) .print();
        Env.execute("flink job");
    }
}

运行结果:

以上是关于ORACLE的DDL日志 推送到Kafka,并接入Flink,进行统计的主要内容,如果未能解决你的问题,请参考以下文章

sleuth+zipkin+kafka+logstash链路追踪二次开发方案

收集 MongoDB 增量数据并推送到 kafka?

python分析nginx日志并推送到open-falcon

Oracle PL/SQL 程序在源表中拆分逗号分隔的数据并推送到目标表中

如何使用 JMX 导出器将 JMX 指标从 Kafka 消费者推送到普罗米修斯

Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra