Flume插件 -- Sequoiadb Sink
Posted 深广大数据Club
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume插件 -- Sequoiadb Sink相关的知识,希望对你有一定的参考价值。
“ 本文主要是对我近期开发对flume sequoiadb插件进行介绍,包括相关的设计思路。”
近期由于在佛山广发银行出差,一直没时间发博文,现在才抽出点时间给大家写,请多见谅。
目前在佛山广发银行这边做个实时采集转储的项目,因为涉及到sequoiadb数据写入的关系,所以基于flume写了一个sequoiadb sink做数据入库。代码已上传至github上:https://github.com/Jonathan-Wei/flume-sequoiadb-sink
版本信息:
* flume-1.6.0以上
* sequoiadb-driver-2.8.5
这里要注意的是,如果sequoiadb版本不同,需要的sequoiadb driver版本也不同,目前sequoiadb的安装版本最新是2.6.4,所需要的sequoiadb driver版本是2.10。两个版本的api有所不同,2.10版本的代码路径做了调整,需要做针对性的修改。
SequoiaDB 是一款金融级分布式数据库。SequoiaDB 3.0 主要提供:
• 分布式NewSQL--- 支撑分布式架构的核心交易系统
• 分布式对象存储 --- 海量结构化、非结构化和半结构化数据统一管理
• 分布式NoSQL --- 高性能、高并发数据访问 真正实现Multi-Model全模,支撑所有数据库业务场景。
设计思路
1、支持数据的增删改
2、支持sequoiadb 集群操作
3、支持用户自定义数据分隔符
4、支持数据操作类型配置
5、支持数据字段过滤(下次更新支持)
数据标识
数据分隔符 “,”
操作分隔符 “|” --进行操作分隔
Key-Value分隔符 “:” --主要用于更新、删除时的键值设置
数据操作符号
插入 I
更新 U
删除 D
插入
数据操作符(I)+ 操作分隔符(|)+值列表
I|34406a98-e150-461f-9ae9-ff2ae2a5e0ac,name_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,29,dept_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,专科,男,1983-2,hometown_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,country_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,nation_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,null,已婚,健康,2006-2,优秀,Addr:34406a98-e150-461f-9ae9-ff2ae2a5e0ac,TelNo:34406a98-e150-461f-9ae9-ff2ae2a5e0ac,34406a98-e150-461f-9ae9-ff2ae2a5e0ac@mail.com,job_34406a98-e150-461f-9ae9-ff2ae2a5e0ac,1526883625012
更新
数据操作符(U)+ 操作分隔符(|)+条件(Key:Value)+操作分隔符(|)+更新数据(key:value)
U|empNo:efafbbe0-a1c8-4baf-962c-0fe4945b15b3|empAge:38
删除
数据操作符(D)+ 操作分隔符(|)+删除条件(Key:Value)
D|empNo:1a041a45-2dec-4844-99b6-ceabf99716e5
public class Config {
# sequoiadb 服务配置
public static final String SEQUOIADB_SERVERS = "sequoiadb.servers";
public static final String SEQUOIADB_USERNAME = "sequoiadb.username";
public static final String SEQUOIADB_PASSWORD = "sequoiadb.password";
public static final String SEQUOIADB_COLLECTION_SPACE = "sequoiadb.collectionSpace";
public static final String SEQUOIADB_COLLECTION = "sequoiadb.collection";
# sequoiadb 连接池配置
public static final String SEQUOIADB_POOL_CONNECTTIMEOUT = "sequoiadb.pool.connectTimeOut";
public static final String SEQUOIADB_POOL_MAXAUTOCONNECTRETRYTIME = "sequoiadb.pool.maxAutoConnectRetryTime";
public static final String SEQUOIADB_POOL_MAXCOUNT = "sequoiadb.pool.maxCount";
public static final String SEQUOIADB_POOL_DELTAINCCOUNT = "sequoiadb.pool.deltaIncCount";
public static final String SEQUOIADB_POOL_MAXIDLECOUNT = "sequoiadb.pool.maxIdleCount";
public static final String SEQUOIADB_POOL_KEEPALIVETIMEOUT = "sequoiadb.pool.keepAliveTimeout";
public static final String SEQUOIADB_POOL_CHECKINTERVAL = "sequoiadb.pool.checkInterval";
public static final String SEQUOIADB_POOL_SYNCCOORDINTERVAL = "sequoiadb.pool.syncCoordInterval";
public static final String SEQUOIADB_POOL_VALIDATECONNECTION = "sequoiadb.pool.validateConnection";
public static final String SEQUOIADB_POOL_CONNECTSTRATEGY = "sequoiadb.pool.connectStrategy";
# sequoiadb 其他配置
#批次大小
public static final String SEQUOIADB_BATCHSIZSE = "sequoiadb.batchSize";
# sequoiadb字段信息
public static final String SEQUOIADB_COLLECTION_FIELDS = "sequoiadb.collection.fields";
# 字段分隔符
public static final String SEQUOIADB_COLLECTION_FIELDDELIMITER = "sequoiadb.collection.field.delimiter";
# 字段类型,必须与字段信息匹配
public static final String SEQUOIADB_COLLECTION_FIELDTYPES = "sequoiadb.collection.field.types";
public static final String SEQUOIADB_OPERATION_INDEX = "sequoiadb.operation.index";
public static final String SEQUOIADB_UNIQUE_IDENTIFIER_INDEX = "sequoiadb.unique.identifier.index";
# 操作分隔符
public static final String SEQUOIADB_OPERATION_DELEMITER = "sequoiadb.operation.delimiter";
# 当操作只配置insert时,采用批量插入方式入库
public static final String SEQUOIADB_INSERT_BATCHSIZE = "sequoiadb.insert.batchSize";
# 操作类型,默认all,可配置值 all、insert、delete、update,可组合配置
public static final String SEQUOIADB_OPERATION_TYPES = "sequoiadb.operation.types";
}
sequoiadb.collection.field.delimiter以及
sequoiadb.operation.delimiter实现对源数据对格式定制
以上是关于Flume插件 -- Sequoiadb Sink的主要内容,如果未能解决你的问题,请参考以下文章
[Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)