Flink的API操作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的API操作相关的知识,希望对你有一定的参考价值。

参考技术A

Apache Flink® - 数据流上的有状态计算

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上 的用户交互记录,所有这些数据都形成一种流。
数据可以被作为 无界 或者 有界 流来处理。

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够 运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部 处理,产生了出色的性能。

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源 管理器,例如 Hadoop YARN Apache Mesos Kubernetes ,但同时也可以作为独立集群运行 (Standalone模式)。 Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resourcemanager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。 部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求 这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序 的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成

Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分 布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容 易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生小的影响,同时保证精确 一次状态的一致性。

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过 可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行 所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证 故障场景下精确一次的状态一致性。

pom.xml

WordCount.java

WordCount.java

WordCount.java

WordCount.java

ScalaWordCount.scala

数据湖(十七):Flink与Iceberg整合DataStream API操作

Flink与Iceberg整合DataStream API操作

目前Flink支持使用DataStream API 和SQL API 方式实时读取和写入Iceberg表,建议大家使用SQL API 方式实时读取和写入Iceberg表。

Iceberg 支持的Flink版本为1.11.x版本以上,目前经过测试Iceberg版本与Flink的版本对应关系如下:

  • Flink1.11.x版本与Iceberg0.11.1版本匹配。
  • Flink1.12.x~Flink1.1.x 版本与Iceberg0.12.1版本匹配,SQL API有一些bug。
  • Flink1.14.x版本与Iceberg0.12.1版本能整合但是有一些小bug,例如实时读取Iceberg中的数据有bug。

以下Flink与Iceberg整合使用的Flink版本为1.13.5,Iceberg版本为0.12.1版本。后期使用SQL API 操作时使用的Flink版本为1.11.6,Iceberg版本为0.11.1版本。

一、DataStream API 实时写入Iceberg表

DataStream Api方式操作Iceberg方式目前仅支持Java Api。使用DataStream API 实时写入Iceberg表具体操作如下:

1、首先在Maven中导入以下依赖

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.12.x -1.13.x 版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->
<flink.version>1.13.5</flink.version>
<!--<flink.version>1.12.1</flink.version>-->
<!--<flink.version>1.14.2</flink.version>-->
<!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
<!--<flink.version>1.11.6</flink.version>-->
<hadoop.version>3.2.2</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-iceberg</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
<!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.12.1</version>
<!--<version>0.11.1</version>-->
</dependency>

<!-- java 开发Flink 所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>$flink.version</version>
</dependency>

<!-- 读取hdfs文件需要jar包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>$hadoop.version</version>
</dependency>

<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>$flink.version</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>

数据湖(十七):Flink与Iceberg整合DataStream

2、编写代码使用DataStream API将Kafka数据写入到Iceberg表

import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.util.Map;

/**
* 使用DataStream Api 向Iceberg 表写入数据
*/
public class StreamAPIWriteIceberg
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。
env.enableCheckpointing(5000);

//2.读取Kafka 中的topic 数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092,node2:9092,node3:9092")
.setTopics("flink-iceberg-topic")
.setGroupId("my-group-id")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

//3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。
SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>()
@Override
public RowData map(String s) throws Exception
System.out.println("s = "+s);
String[] split = s.split(",");
GenericRowData row = new GenericRowData(4);
row.setField(0, Integer.valueOf(split[0]));
row.setField(1, StringData.fromString(split[1]));
row.setField(2, Integer.valueOf(split[2]));
row.setField(3, StringData.fromString(split[3]));
return row;

);

//4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Configuration hadoopConf = new Configuration();
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");

//配置iceberg 库名和表名
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");

//创建Icebeng表Schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "nane", Types.StringType.get()),
Types.NestedField.required(3, "age", Types.IntegerType.get()),
Types.NestedField.required(4, "loc", Types.StringType.get()));

//如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区
// PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();

//指定Iceberg表数据格式化为Parquet存储
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;

// 通过catalog判断表是否存在,不存在就创建,存在就加载
if (!catalog.tableExists(name))
table = catalog.createTable(name, schema, spec, props);
else
table = catalog.loadTable(name);


TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);

//5.通过DataStream Api 向Iceberg中写入数据
FlinkSink.forRowData(dataStream)
//这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
.table(table)
.tableLoader(tableLoader)
//默认为false,追加数据。如果设置为true 就是覆盖数据
.overwrite(false)
.build();

env.execute("DataStream Api Write Data To Iceberg");

数据湖(十七):Flink与Iceberg整合DataStream

以上代码有如下几个注意点:

  • 需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
  • 读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
  • 在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
  • 不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。

3、在Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据

# 在Kafka 中创建 flink-iceberg-topic topic
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic flink-iceberg-topic --partitions 3 --replication-factor 3

数据湖(十七):Flink与Iceberg整合DataStream

创建好以上topic之后,启动代码,然后向topic中生产以下数据:

[root@node1 bin]#./kafka-console-producer.sh  --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai

数据湖(十七):Flink与Iceberg整合DataStream

可以看到在HDFS 对应的路径中保存了对应的数据:

数据湖(十七):Flink与Iceberg整合DataStream

数据湖(十七):Flink与Iceberg整合DataStream

4、通过Hive查看保存到Iceberg中的数据

 启动Hive、Hive Metastore 在Hive中创建映射Iceberg的外表:

CREATE TABLE flink_iceberg_tbl  (
id int,
name string,
age int,
loc string
)
STORED BY org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
LOCATION hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl
TBLPROPERTIES (iceberg.catalog=location_based_table);

数据湖(十七):Flink与Iceberg整合DataStream

 注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。

通过Hive查询对应的Iceberg表中的数据,结果如下:

hive> select * from flink_iceberg_tbl;
OK
2 ls

数据湖(十七):Flink与Iceberg整合DataStream

二、DataStream API 批量/实时读取Iceberg表

DataStream API 读取Iceberg表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。

1、批量/全量读取

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;

/**
* 使用DataStream Api 批量/实时 读取Iceberg 数据
*/
public class StreamAPIReadIceberg
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//1.配置TableLoader
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);

//2.从Iceberg中读取全量/增量读取数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(false)
.build();

batchData.map(new MapFunction<RowData, String>()
@Override
public String map(RowData rowData) throws Exception
int id = rowData.getInt(0);
String name = rowData.getString(1).toString();
int age = rowData.getInt(2);
String loc = rowData.getString(3).toString();
return id+","+name+","+age+","+loc;

).print();

env.execute("DataStream Api Read Data From Iceberg");


数据湖(十七):Flink与Iceberg整合DataStream

结果如下:

数据湖(十七):Flink与Iceberg整合DataStream

数据湖(十七):Flink与Iceberg整合DataStream

2、实时读取

//当配置 streaming参数为true时就是实时读取
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();

数据湖(十七):Flink与Iceberg整合DataStream

修改以上代码并启动,向Hive 对应的Iceberg表“flink_iceberg_tbl”中插入2条数据:

在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

数据湖(十七):Flink与Iceberg整合DataStream

向Hive 中Iceberg 表插入两条数据

hive> insert into flink_iceberg_tbl values (5,s1,30,guangzhou),(6,s2,31,tianjin);

数据湖(十七):Flink与Iceberg整合DataStream

插入完成之后,可以看到Flink 控制台实时读取到对应数据

数据湖(十七):Flink与Iceberg整合DataStream

数据湖(十七):Flink与Iceberg整合DataStream

三、指定基于快照实时增量读取数据

以上案例我们发现Flink将表中所有数据都读取出来,我们也可以指定对应的snapshot-id 决定基于哪些数据增量读取数据。

DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//基于某个快照实时增量读取数据,快照需要从元数据中获取
.startSnapshotId(4226332606322964975L)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();

数据湖(十七):Flink与Iceberg整合DataStream

结果只读取到指定快照往后的数据,如下:

数据湖(十七):Flink与Iceberg整合DataStream

数据湖(十七):Flink与Iceberg整合DataStream

四、合并data files

Iceberg提供Api将小文件合并成大文件,可以通过Flink 批任务来执行。Flink中合并小文件与Spark中小文件合并完全一样。

代码如下:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;

/**
* 可以通过提交Flink批量任务来合并Data Files 文件。
*/
public class RewrietDataFiles
public static void main(String[] args)

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1.配置TableLoader
Configuration hadoopConf = new Configuration();

//2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");

//3.配置iceberg 库名和表名并加载表
TableIdentifier name =
TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
Table table = catalog.loadTable(name);

//4..合并 data files 小文件
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
//默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。
.targetSizeInBytes(536870912L)
.execute();

数据湖(十七):Flink与Iceberg整合DataStream

以上是关于Flink的API操作的主要内容,如果未能解决你的问题,请参考以下文章

Flink Table API & SQL 基本操作

Flink Table API & SQL 基本操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

数据湖(十八):Flink与Iceberg整合SQL API操作

8.FLINK Transformation基本操作合并和连接拆分和选择rebalance重平衡分区其他分区操作API

8.FLINK Transformation基本操作合并和连接拆分和选择rebalance重平衡分区其他分区操作API