Flume插件 -- Sequoiadb Sink

Posted 深广大数据Club

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume插件 -- Sequoiadb Sink相关的知识,希望对你有一定的参考价值。

 本文主要是对我近期开发对flume sequoiadb插件进行介绍,包括相关的设计思路。


近期由于在佛山广发银行出差,一直没时间发博文,现在才抽出点时间给大家写,请多见谅。

目前在佛山广发银行这边做个实时采集转储的项目,因为涉及到sequoiadb数据写入的关系,所以基于flume写了一个sequoiadb sink做数据入库。代码已上传至github上:https://github.com/Jonathan-Wei/flume-sequoiadb-sink


版本信息:

* flume-1.6.0以上

* sequoiadb-driver-2.8.5


这里要注意的是,如果sequoiadb版本不同,需要的sequoiadb driver版本也不同,目前sequoiadb的安装版本最新是2.6.4,所需要的sequoiadb driver版本是2.10。两个版本的api有所不同,2.10版本的代码路径做了调整,需要做针对性的修改。


Sequoiadb介绍

Flume插件 -- Sequoiadb SinkFlume插件 -- Sequoiadb Sink

SequoiaDB 是一款金融级分布式数据库。SequoiaDB 3.0 主要提供:

• 分布式NewSQL--- 支撑分布式架构的核心交易系统 

• 分布式对象存储 --- 海量结构化、非结构化和半结构化数据统一管理 

• 分布式NoSQL --- 高性能、高并发数据访问 真正实现Multi-Model全模,支撑所有数据库业务场景。


Flume Sequoiadb Sink插件介绍

Flume插件 -- Sequoiadb Sink

设计思路

1、支持数据的增删改

2、支持sequoiadb 集群操作

3、支持用户自定义数据分隔符

4、支持数据操作类型配置

5、支持数据字段过滤(下次更新支持)


Flume源端数据格式

Flume插件 -- Sequoiadb Sink

数据标识

数据分隔符   “,”

操作分隔符   “|”    --进行操作分隔

Key-Value分隔符    “:”   --主要用于更新、删除时的键值设置


数据操作符号

插入    I

更新    U

删除    D


Flume源端数据样例

Flume插件 -- Sequoiadb Sink

插入

数据操作符(I)+ 操作分隔符(|)+值列表

I|34406a98-e150-461f-9ae9-ff2ae2a5e0ac,name_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,29,dept_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,专科,男,1983-2,hometown_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,country_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,nation_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,null,已婚,健康,2006-2,优秀,Addr:34406a98-e150-461f-9ae9-ff2ae2a5e0ac,TelNo:34406a98-e150-461f-9ae9-ff2ae2a5e0ac,34406a98-e150-461f-9ae9-ff2ae2a5e0ac@mail.com,job_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,1526883625012

更新

数据操作符(U)+ 操作分隔符(|)+条件(Key:Value)+操作分隔符(|)+更新数据(key:value) 

U|empNo:efafbbe0-a1c8-4baf-962c-0fe4945b15b3|empAge:38


删除

数据操作符(D)+ 操作分隔符(|)+删除条件(Key:Value)

D|empNo:1a041a45-2dec-4844-99b6-ceabf99716e5

Flume sequoiadb sink配置样例

Flume插件 -- Sequoiadb Sink

Flume sequoiadb sink配置列表

public class Config {
    # sequoiadb 服务配置
    public static final String SEQUOIADB_SERVERS = "sequoiadb.servers";
    public static final String SEQUOIADB_USERNAME = "sequoiadb.username";
    public static final String SEQUOIADB_PASSWORD = "sequoiadb.password";
    public static final String SEQUOIADB_COLLECTION_SPACE = "sequoiadb.collectionSpace";
    public static final String SEQUOIADB_COLLECTION = "sequoiadb.collection";
    # sequoiadb 连接池配置
    public static final String SEQUOIADB_POOL_CONNECTTIMEOUT = "sequoiadb.pool.connectTimeOut";
    public static final String SEQUOIADB_POOL_MAXAUTOCONNECTRETRYTIME = "sequoiadb.pool.maxAutoConnectRetryTime";
    public static final String SEQUOIADB_POOL_MAXCOUNT = "sequoiadb.pool.maxCount";
    public static final String SEQUOIADB_POOL_DELTAINCCOUNT = "sequoiadb.pool.deltaIncCount";
    public static final String SEQUOIADB_POOL_MAXIDLECOUNT = "sequoiadb.pool.maxIdleCount";
    public static final String SEQUOIADB_POOL_KEEPALIVETIMEOUT = "sequoiadb.pool.keepAliveTimeout";
    public static final String SEQUOIADB_POOL_CHECKINTERVAL = "sequoiadb.pool.checkInterval";
    public static final String SEQUOIADB_POOL_SYNCCOORDINTERVAL = "sequoiadb.pool.syncCoordInterval";
    public static final String SEQUOIADB_POOL_VALIDATECONNECTION = "sequoiadb.pool.validateConnection";
    public static final String SEQUOIADB_POOL_CONNECTSTRATEGY = "sequoiadb.pool.connectStrategy";
    # sequoiadb 其他配置
    #批次大小    
    public static final String SEQUOIADB_BATCHSIZSE = "sequoiadb.batchSize";    
    # sequoiadb字段信息
    public static final String SEQUOIADB_COLLECTION_FIELDS = "sequoiadb.collection.fields";    
    # 字段分隔符
    public static final String SEQUOIADB_COLLECTION_FIELDDELIMITER = "sequoiadb.collection.field.delimiter";  
    # 字段类型,必须与字段信息匹配
    public static final String SEQUOIADB_COLLECTION_FIELDTYPES = "sequoiadb.collection.field.types";
    public static final String SEQUOIADB_OPERATION_INDEX = "sequoiadb.operation.index";
    public static final String SEQUOIADB_UNIQUE_IDENTIFIER_INDEX = "sequoiadb.unique.identifier.index";
    # 操作分隔符
    public static final String SEQUOIADB_OPERATION_DELEMITER = "sequoiadb.operation.delimiter";
    # 当操作只配置insert时,采用批量插入方式入库
    public static final String SEQUOIADB_INSERT_BATCHSIZE = "sequoiadb.insert.batchSize";
    # 操作类型,默认all,可配置值 all、insert、delete、update,可组合配置
    public static final String SEQUOIADB_OPERATION_TYPES = "sequoiadb.operation.types";
}

以上时flume sequoiadb sink的配置参数列表。具体的默认配置信息可查看SequoiadbOneSink.java。

* 对于sequoiadb集群的支持,采用sequoiadb 数据连接池的方式实现,配置通过配置多个host到 sequoiadb.servers来实现
* sequoiadb字段以及字段类型配置通过 sequoiadb.collection.fields以及 sequoiadb.collection.field.types实现
* 用户可通过 sequoiadb.operation.types来配置你想要对数据库进行对操作类型
* 数据分隔符和操作分隔符可以通过配置
sequoiadb.collection.field.delimiter以及
sequoiadb.operation.delimiter实现对源数据对格式定制
* 字段过滤目前还没实现,后续会加上。

有需要的伙伴可以到github上面获取代码,根据自己的需求进行修改。当然有优化的内容,也可以提交到github上合并到项目中。



以上是关于Flume插件 -- Sequoiadb Sink的主要内容,如果未能解决你的问题,请参考以下文章

Flume -- 初识flumesource和sink

flume学习---sink

flume自定义sink

Flume内置channel,source,sink汇总

Flume-自定义 Sink

[Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)