[第8期] 流式计算之 Storm 爱用者初尝 Flink
Posted 百姓网技术团队
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[第8期] 流式计算之 Storm 爱用者初尝 Flink相关的知识,希望对你有一定的参考价值。
背景介绍
百姓网有大量的用户行为数据和系统数据需要进行实时处理与分析,以便于业务方实时地监控业务运行状态,所以我们建设了基于 Storm 的实时数据处理系统,将持续不断流入的日志数据进行清洗与转换,打入下游的各个分析系统。
但是 Storm 用久了之后,我们也逐渐体会到了一些痛苦,最主要的就是 Storm 比较底层,每次我们需要修改数据处理规则,都需要修改 Java 代码。随着需要处理的数据主题不断增加,数据团队开发人员的负担越来越重。数据团队的一个特征是每个人都很熟悉 SQL,所以渐渐就产生了一个期待:如果定义流式数据的处理过程能像写 SQL 这样、而且随时可以修改的话,那这些业务规则的变动、业务条线的增加,就不需要再开发 Java 代码了,甚至业务运营的同事也有可能能自行配制数据转换规则了,数据团队仅需确保平台的稳定和牢固,这对效率的提升将十分巨大。带着这种愿景,我们先考察了 Storm SQL,可惜它目前还处在实验阶段,无法满足需求,这时一些机缘巧合下,Flink 进入了我们的视野。
关于 Flink 的二三事
Flink 的身世
Flink 是新一代的流计算框架,它于 2014 年加入 Apache 基金会,并成为顶级项目。Flink 主要由 Java 编写,目前由 Data Artist 公司进行主导。Flink 支持流处理和批处理,但是流处理是它的核心,并且在 Stream API 的基础上封装了 Table API 和 SQL,使得用户可以通过 SQL 来对数据流进行清洗。Flink 原生支持 Exactly Once,并不需要开发者通过额外的方式来保证数据的准确处理。
Flink 的架构简述
Flink 与大部分分布式计算引擎的架构类似,主要分为两个组件:
JobManager: 处理用户提交的任务,并进行分配,监控整个集群的状态。
TaskManager: 执行 JobManager 分配的任务,并在多个 TaskManager 之间进行数据交换。
每个 TaskManager 都是一个 JVM 的实例。相比其他计算框架,Flink 对资源的分配更加细致,它会将每个 TaskManager 的内存分为多个部分,用户只能控制其中的一小部分,并且 Flink 支持将数据转化为二进制流保存在 JVM 之外的内存中,这样子会降低 OOM 发生的概率。Flink 的架构图如下:
插图来自 https://flink.apache.org
图中,用户通过 Client 将程序提交给 JobManager,然后 JobManager 将任务分配给 2 个 TaskManager 进行处理。
Flink 的生态建设
一个好的计算框架需要有一个良好的生态,框架之上的应用越多,这个框架的发展才会越好,生命力才会越强大,我们用起来才会放心……下图是 Flink 的组件栈:
插图来自 https://flink.apache.org
从上图可以看到,Flink 支持 Table & SQL、机器学习、 图计算……它的组件覆盖度已经和 Spark 差不多了,但与 Spark 相比,成熟度还有些欠缺,还需要时间发展。
实战:使用 Flink SQL 处理数据
来点干货:我们把之前用 Storm 来处理的业务数据转换需求,尝试用 Flink 来实现一次。我们的业务需求之一是对 Kafka 中过来的数据流进行实时的清洗,例如将数据 A:
{
"type":"A",
"timestamp":"1502323401603",
"msg":{
"userId":"123456",
"id":"sv123",
"start_flag":"true"
}
}
转化为数据 B:
{
"type":"B",
"ts":1502323401603,
"uid":"123456",
"vid":"sv123",
"start_flag":1
}
这里,我们将字段打平,并且修改了其中的某些数据。例如将 type 的值从 A 变为了 B。
我们通过 Flink 来实现这样的处理数据流程,要求是用 SQL 配置来实现转换,而不再用 Java 代码来描述转换逻辑。 好,开始干活,我们需要创建一个项目,本文使用 IntelliJ IDEA 作为 IDE。 首先通过下面的命令快速创建一个 Maven 项目。
curl https://flink.apache.org/q/quickstart.sh | bash
项目的结构如下:
quickstart
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
然后使用 IntelliJ IDEA 导入项目。
在 pom.xml 的依赖中加入 flink-table_2.10。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>1.3.2</version>
</dependency>
注意 pom.xml 中对应 flink.version
和 scala.binary.version
一定要一致。我们使用 Flink 1.3.2 和 scala 2.11。pom.xml 的格式如下:
...
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.3.2</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
...
<dependencies>
...
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
...
</dependencies>
...
新建一个类,叫做 TablExample
, 然后添加如下代码:
public class TableExample {
public static void main(String args[]) throws Exception {
// 创建 streaming 的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 table 的运行环境
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 指定 kafka topic 中的消息的字段名和字段类型
TypeInformation<Row> typeInfo = Types.ROW(
new String[] { "type", "ts", "msg" },
new TypeInformation<?>[] { Types.STRING(), Types.STRING(), Types.MAP(Types.STRING(), Types.STRING())}
);
// 指定topic
String kafkaTopic = "test";
Properties kafkaProperties = new Properties();
// 指定 kafka server
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "fink_test11");
// 调用 kafka table source ,它会从 kafka 中读取json数据,并按照 typeInfo 将里面的字段做好映射
KafkaJsonTableSource kafkaTableSource = new Kafka010JsonTableSource(
kafkaTopic,
kafkaProperties,
typeInfo);
// 指定数据源为 kafka_source 表,这样就可以写sql 从这张表查数据了
tableEnv.registerTableSource("kafka_source", kafkaTableSource);
String execSql = "
" +
"SELECT
" +
" CASE WHEN type = 'A' THEN 'B' ELSE type END AS type,
" +
" CAST (ts AS DECIMAL) AS ts,
" +
" msg['userId'] AS uid,
" +
" CASE WHEN msg['id'] LIKE 'sv%' THEN msg['id'] END AS vid,
" +
" CASE WHEN msg['start_flag'] = 'true' THEN 1 ELSE 0 END AS start_flag
" +
"FROM
" +
" kafka_source";
// 从表中查数据
Table play = tableEnv.sql(execSql);
// 将 table 转化为 stream
DataStream<Row> playStreaming = tableEnv.toAppendStream(play, Row.class);
// 输出 stream 中的数据
playStreaming.print();
// 执行流程,并给这个Flink 程序起名为 play table
env.execute("play table");
}
}
在这个程序中,我们通过
SELECT
CASE WHEN type = 'A' THEN type = 'B' END AS type,
CAST(ts AS DECIMAL) AS ts,
msg['userId'] AS uid,
CASE WHEN msg['id'] LIKE 'sv%' THEN msg['id'] END AS vid,
CASE WHEN msg['start_flag'] = 'true' THEN 1 ELSE 0 END AS start_flag
FROM
kafka_source
来转换数据。
运行上面的类,并在 Kafka 中的 test topic 中输入:
{"type":"A", "ts":"1502323401603", "msg":{"userId":"123456", "id":"sv123", "start_flag":"true"} }
我们就可以在 IDE 中看到输出结果为:
B,1502323401603,123456,sv123,1
结果和我们预期的一样,然后我们只需要将执行结果转换为对应的 JSON 就好了。
从上面的结果可以看出,Flink 解决了我们在使用 Storm 中处理一些特定数据清洗需求的痛点。每一个事件的清理需求都可以通过一个 SQL 进行描述。
如果用 Storm 来处理的话,我们会新建几个 Bolts,并在 Bolts 中对数据进行处理。当然还可以从其中总结出一些通用的规则,并通过读取特定的配置文件,然后利用相应的规则处理。但是总有些特殊的数据,例如数据 A 中 id,如果它的值以 sv 开头,就将 id 命名为 vid,如果以 ad 开头,就将 id 命名为 aid。这类需求我们只能通过不断添加规则或者硬编码来解决。随着特定需求越来越多,代码也越来越难维护。
忆苦思甜,看来 Flink 值得期待。
总而言之
本文简要介绍了下 Flink 的特点,并通过一个案例展示如何通过 Flink SQL 处理数据。和 Storm 相比较,Flink 使用起来更加方便,同时用户在编码的时候可以更加专注业务逻辑。在以后实时流工具的使用方面,用户可以直接通过 SQL 对数据进行 ETL,大大降低了工具的使用门槛。Flink 也在向这方面发展。当然 Flink 还是存在一些问题的,它的一些 API 使用不太方便,并且 Table API & SQL 还处在 Beta 阶段。如果读者想对 Flink 有更多的了解,可以查看官方文档[1]。Flink 在很多方面和 Spark Streaming 更加类似,后续我们也会将它和 Spark Streaming 做比较,而且也会继续深入研究 Flink。
参考
[1] https://flink.apache.org
张家利百姓网数据工程师,致力于数据技术架构和系统方面的研发,主要技术方向包括实时数据流处理、Hadoop 生态系统、MPP 等。 |
本文仅为作者个人观点,不代表百姓网立场。
(题图来源:Flink Logo)
每周推送一篇技术文章, 长按二维码立即关注! |
以上是关于[第8期] 流式计算之 Storm 爱用者初尝 Flink的主要内容,如果未能解决你的问题,请参考以下文章