Flink输出算子Sink

Posted 热心市民何先生、

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink输出算子Sink相关的知识,希望对你有一定的参考价值。

1.输出到文件

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。

  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。

在创建行或批量编码Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径

(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

StreamingFileSink<String> fileSink = StreamingFileSink
.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)
)
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5
))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();

// 将 Event 转换成 String 写入文件
stream.map(Event::toString).addSink(fileSink);

这里我们创建了一个简单的文件Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下3 种情况下,我们就会滚动分区文件:

  • 至少包含15 分钟的数据

  • 最近5 分钟没有收到新的数据

  • 文件大小已达到1 GB

2.输出到kafka

现在我们要将数据输出到Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:

  1. 添加Kafka 连接器依赖

由于我们已经测试过从Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

  1. 启动Kafka 集群

  1. 编写输出到Kafka 的示例代码

Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop102:9092");

DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");

stream
.addSink(new FlinkKafkaProducer<String>( "clicks",
new SimpleStringSchema(), properties
));

这里我们可以看到,addSink 传入的参数是一个FlinkKafkaProducer。这也很好理解,因为需要向Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的RichSinkFunction。两阶段提交提供了Flink 向Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

3.输出到Redis

Redis 是一个开源的内存式的数据存储,提供了像字符串(string)、哈希表(hash)、列表

(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富,在实际项目中已经成为了架构优化必不可少的一员,一般用作数据库、缓存,也可以作为消息代理。

具体测试步骤如下:

  1. 导入的Redis 连接器依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
  1. 启动Redis 集群

这里我们为方便测试,只启动了单节点Redis。

  1. 编写输出到Redis 的示例代码

连接器为我们提供了一个RedisSink,它继承了抽象类RichSinkFunction,这就是已经实现好的向Redis 写入数据的SinkFunction。我们可以直接将Event 数据输出到Redis:

// 创建一个到 redis 连接的配置
FlinkJedisPoolConfig conf=FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
env.addSource(new ClickSource())
.addSink(new RedisSink<Event>(conf, new MyRedisMapper()));

这里RedisSink 的构造方法需要传入两个参数:

  • JFlinkJedisConfigBase:Jedis 的连接配置

  • RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入Redis 的类型接下来主要就是定义一个Redis 的映射类,实现RedisMapper 接口。

public static class MyRedisMapper implements RedisMapper<Event>  @Override
public String getKeyFromData(Event e)  return e.user;


@Override
public String getValueFromData(Event e)  return e.url;


@Override
public RedisCommandDescription getCommandDescription() 
return new RedisCommandDescription(RedisCommand.HSET, "clicks");

在这里我们可以看到,保存到 Redis 时调用的命令是 HSET,所以是保存为哈希表(hash),

表名为“clicks”;保存的数据以 user 为 key,以url 为 value,每来一条数据就会做一次转换

4.输出到Elasticsearch

ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch 有着简洁的REST 风格的API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。

写入数据的ElasticSearch 的测试步骤如下。

  1. 添加Elasticsearch 连接器依赖

<dependency>
<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-elasticsearch7_$scala.binary.version</artifactI d>
<version>$flink.version</version>
</dependency>
  1. 启动Elasticsearch 集群

  1. 编写输出到Elasticsearch 的示例代码

ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop102", 9200, "http"));




// 创建一个 ElasticsearchSinkFunction
ElasticsearchSinkFunction<Event>    elasticsearchSinkFunction    =    new ElasticsearchSinkFunction<Event>() 
@Override
public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) 
HashMap<String, String> data = new HashMap<>(); data.put(element.user, element.url);

IndexRequest request = Requests.indexRequest()
.index("clicks")
.type("type")    // Es 6 必须定义 type
.source(data);

indexer.add(request);

;

stream.addSink(new    ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());

stream.addSink(esBuilder.build());

与RedisSink 类似,连接器也为我们实现了写入到Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用ElasticsearchSink 的Builder 内部静态类,调用它的build()方法才能创建出真正的SinkFunction。而Builder 的构造方法中又有两个参数:

  • httpHosts:连接到的Elasticsearch 集群主机列表

  • elasticsearchSinkFunction:这并不是我们所说的SinkFunction,而是用来说明具体处理逻辑、准备数据向Elasticsearch 发送请求的函数

具体的操作需要重写中elasticsearchSinkFunction 中的process 方法,我们可以将要发送的数据放在一个HashMap 中,包装成IndexRequest 向外部发送HTTP 请求。

5.输出到mysql(JDBC)

写入数据的MySQL 的测试步骤如下。

  1. 添加依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
  1. 启动MySQL,在database 库下建表clicks

  1. 编写输出到MySQL 的示例代码

stream.addSink(
JdbcSink.sink(
"INSERT INTO clicks (user, url) VALUES (?, ?)", (statement, r) -> 
statement.setString(1, r.user); statement.setString(2, r.url);
,
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/userbe
havior")
// 对于 MySQL 5.7,用"com.mysql.jdbc.Driver"
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("username")
.withPassword("password")
.build()
)
);

6.自定义Sink 输出

如果我们想将数据存储到我们自己的存储设备中,而Flink 并没有提供可以直接使用的连接器,又该怎么办呢?

与Source 类似,Flink 为我们提供了通用的SinkFunction 接口和对应的RichSinkDunction 抽象类,只要实现它,通过简单地调用DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了SinkFunction,现在既然没有现成的,我们就只好自力更生了。例如,Flink 并没有提供HBase 的连接器,所以需要我们自己写。

在实现SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

我们这里使用了SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念,创建HBase 的连接以及关闭HBase 的连接需要分别放在open()方法和close()方法中。

(1)导入依赖

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>$hbase.version</version>
</dependency>

(2)编写输出到HBase 的示例代码

env
.fromElements("hello", "world")
.addSink(
new RichSinkFunction<String>() 
public    org.apache.hadoop.conf.Configuration
configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入
public Connection connection; // 管理 Hbase 连接
Exception 
"hadoop102:2181");
@Override
public    void    open(Configuration    parameters)    throws
super.open(parameters);
confin(configuration);

Exception 
@Override
public void invoke(String value, Context context) throws
Table    table    =
connection.getTable(TableName.valueOf("test")); // 表名为 test
guration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum",
connection    =
ConnectionFactory.createConnectioPut    put    =    new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
, value.getBytes(StandardCharsets.UTF_8) // 写入的数据
, "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据
table.put(put); // 执行 put 操作
table.close(); // 将表关闭

@Override
public void close() throws Exception  super.close();
connection.close(); // 关闭连接


);

以上是关于Flink输出算子Sink的主要内容,如果未能解决你的问题,请参考以下文章

10-flink-1.10.1- flink Sink api 输出算子

Flink单流算子

Flink输出算子Sink

Flink 流处理 API_Sink

Flink系列文档-(YY06)-Flink编程API-Sink

Flink第四篇之Flink的DataStream API(算子解析)