Flink kuduSink开发

Posted codetouse

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink kuduSink开发相关的知识,希望对你有一定的参考价值。

1、继承RichSinkFunction

(1)首先在构造方式传入kudu的masterAddress地址、默认表名、TableSerializationSchema、KuduTableRowConverter、Properties配置对象

(2)重写open方法

初始化KuduClient对象操作kudu,KuduSession对象并传入一堆配置

(3)重写invoke方法

核心是如果已传入TableSerializationSchema对象,则通过其serializeTable方法从输入的json数据里提取表名,如果未定义则直接取默认表名。拿到表名后就能使用KuduClient对象对其操作了

if (schema != null) {
String serializeTableName = schema.serializeTable(row);
if (serializeTableName == null) return;
table = client.openTable(serializeTableName);
}
else
table = client.openTable(tableName);
insert = table.newInsert();

2、定义KuduTableRowConverter接口,将每一条输入数据转换成TableRow对象

public interface KuduTableRowConverter<IN> extends Serializable {
TableRow convert(IN value);
}

定义TableRow类,代表一行数据,key是字串型的键名,value是Object型的键值

public class TableRow implements Serializable {
private static final long serialVersionUID = 1L;
private Map<String, Object> pairs = new HashMap<>();
public int size() {return pairs.size();}
public Map<String, Object> getPairs() {return pairs;}
public Object getElement(String key) {return pairs.get(key);}
public void putElement(String key, Object value) {pairs.put(key, value);}
}

定义JsonKuduTableRowConverter实现KuduTableRowConverter接口,对于输入的json数据,通过一系列转换逻辑转换成TableRow对象

3、定义TableSerializationSchema接口,从每一条输入数据里提取表名

public interface TableSerializationSchema<IN> extends Serializable {
String serializeTable(IN value);
}

定义JsonLogidKeyTableSerializationSchema实现TableSerializationSchema接口,对于输入的json数据,使用指定key值提取value值,然后再从一个预先获取的map里找到这个value对应的表名,然后加上必要的前缀与后缀组成impala的表名

以上是关于Flink kuduSink开发的主要内容,如果未能解决你的问题,请参考以下文章

Flink 多流转换算子

如何使用Apache Flink阅读Cassandra?

01-flink-1.10.1开发flink代码需要的maven

大数据技术Flink开发环境准备和API代码案例

大数据技术Flink开发环境准备和API代码案例

大数据技术Flink开发环境准备和API代码案例