[第8期] 流式计算之 Storm 爱用者初尝 Flink

Posted 百姓网技术团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[第8期] 流式计算之 Storm 爱用者初尝 Flink相关的知识,希望对你有一定的参考价值。

背景介绍

百姓网有大量的用户行为数据和系统数据需要进行实时处理与分析,以便于业务方实时地监控业务运行状态,所以我们建设了基于 Storm 的实时数据处理系统,将持续不断流入的日志数据进行清洗与转换,打入下游的各个分析系统。

但是 Storm 用久了之后,我们也逐渐体会到了一些痛苦,最主要的就是 Storm 比较底层,每次我们需要修改数据处理规则,都需要修改 Java 代码。随着需要处理的数据主题不断增加,数据团队开发人员的负担越来越重。数据团队的一个特征是每个人都很熟悉 SQL,所以渐渐就产生了一个期待:如果定义流式数据的处理过程能像写 SQL 这样、而且随时可以修改的话,那这些业务规则的变动、业务条线的增加,就不需要再开发 Java 代码了,甚至业务运营的同事也有可能能自行配制数据转换规则了,数据团队仅需确保平台的稳定和牢固,这对效率的提升将十分巨大。带着这种愿景,我们先考察了 Storm SQL,可惜它目前还处在实验阶段,无法满足需求,这时一些机缘巧合下,Flink 进入了我们的视野。

关于 Flink 的二三事

Flink 的身世

Flink 是新一代的流计算框架,它于 2014 年加入 Apache 基金会,并成为顶级项目。Flink 主要由 Java 编写,目前由 Data Artist 公司进行主导。Flink 支持流处理和批处理,但是流处理是它的核心,并且在 Stream API 的基础上封装了 Table API 和 SQL,使得用户可以通过 SQL 来对数据流进行清洗。Flink 原生支持 Exactly Once,并不需要开发者通过额外的方式来保证数据的准确处理。

Flink 的架构简述

Flink 与大部分分布式计算引擎的架构类似,主要分为两个组件:

  • JobManager: 处理用户提交的任务,并进行分配,监控整个集群的状态。

  • TaskManager: 执行 JobManager 分配的任务,并在多个 TaskManager 之间进行数据交换。

每个 TaskManager 都是一个 JVM 的实例。相比其他计算框架,Flink 对资源的分配更加细致,它会将每个 TaskManager 的内存分为多个部分,用户只能控制其中的一小部分,并且 Flink 支持将数据转化为二进制流保存在 JVM 之外的内存中,这样子会降低 OOM 发生的概率。Flink 的架构图如下:

插图来自 https://flink.apache.org

图中,用户通过 Client 将程序提交给 JobManager,然后 JobManager 将任务分配给 2 个 TaskManager 进行处理。

Flink 的生态建设

一个好的计算框架需要有一个良好的生态,框架之上的应用越多,这个框架的发展才会越好,生命力才会越强大,我们用起来才会放心……下图是 Flink 的组件栈:

[第8期] 流式计算之 Storm 爱用者初尝 Flink

插图来自 https://flink.apache.org

从上图可以看到,Flink 支持 Table & SQL、机器学习、 图计算……它的组件覆盖度已经和 Spark 差不多了,但与 Spark 相比,成熟度还有些欠缺,还需要时间发展。

实战:使用 Flink SQL 处理数据

来点干货:我们把之前用 Storm 来处理的业务数据转换需求,尝试用 Flink 来实现一次。我们的业务需求之一是对 Kafka 中过来的数据流进行实时的清洗,例如将数据 A:

{
   "type":"A",
   "timestamp":"1502323401603",
   "msg":{
       "userId":"123456",
       "id":"sv123",
       "start_flag":"true"
   
}
}

转化为数据 B:

{
   "type":"B",
   "ts":1502323401603,
   "uid":"123456",
   "vid":"sv123",
   "start_flag":1
}

这里,我们将字段打平,并且修改了其中的某些数据。例如将 type 的值从 A 变为了 B。

我们通过 Flink 来实现这样的处理数据流程,要求是用 SQL 配置来实现转换,而不再用 Java 代码来描述转换逻辑。 好,开始干活,我们需要创建一个项目,本文使用 IntelliJ IDEA 作为 IDE。 首先通过下面的命令快速创建一个 Maven 项目。

curl https://flink.apache.org/q/quickstart.sh | bash

项目的结构如下:

quickstart
├── pom.xml
└── src
   └── main
       ├── java
       │   └── org
       │       └── myorg
       │           └── quickstart
       │               ├── BatchJob.java
       │               ├── SocketTextStreamWordCount.java
       │               ├── StreamingJob.java
       │               └── WordCount.java
       └── resources
           └── log4j.properties

然后使用 IntelliJ IDEA 导入项目。

在 pom.xml 的依赖中加入 flink-table_2.10。

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table_2.10</artifactId>
   <version>1.3.2</version>
</dependency>

注意 pom.xml 中对应 flink.versionscala.binary.version 一定要一致。我们使用 Flink 1.3.2 和 scala 2.11。pom.xml 的格式如下:

...
<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.3.2</flink.version>
   <slf4j.version>1.7.7</slf4j.version>
   <log4j.version>1.2.17</log4j.version>
   <scala.binary.version>2.11</scala.binary.version>
</properties>
...
<dependencies>
   ...
   <!-- Apache Flink dependencies -->
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-java</artifactId>
       <version>${flink.version}</version>
   </dependency>
   ...
</dependencies>
...

新建一个类,叫做 TablExample, 然后添加如下代码:

public class TableExample {

   public static void main(String args[]) throws Exception {

       // 创建 streaming 的运行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       // 创建 table 的运行环境
       StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

       // 指定 kafka topic 中的消息的字段名和字段类型
       TypeInformation<Row> typeInfo = Types.ROW(
               new String[] { "type", "ts", "msg" },
               new TypeInformation<?>[] { Types.STRING(), Types.STRING(), Types.MAP(Types.STRING(), Types.STRING())}
       );

       // 指定topic
       String kafkaTopic = "test";

       Properties kafkaProperties = new Properties();
       // 指定 kafka server
       kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
       kafkaProperties.setProperty("group.id", "fink_test11");

       // 调用 kafka table source ,它会从 kafka 中读取json数据,并按照 typeInfo 将里面的字段做好映射
       KafkaJsonTableSource kafkaTableSource = new Kafka010JsonTableSource(
               kafkaTopic,
               kafkaProperties,
               typeInfo);

       // 指定数据源为 kafka_source 表,这样就可以写sql 从这张表查数据了
       tableEnv.registerTableSource("kafka_source", kafkaTableSource);
       String execSql = " " +
               "SELECT " +
               " CASE WHEN type = 'A' THEN 'B' ELSE type END AS type, " +
               " CAST (ts AS DECIMAL) AS ts, " +
               " msg['userId'] AS uid, " +
               " CASE WHEN msg['id'] LIKE 'sv%' THEN msg['id'] END AS vid, " +
               " CASE WHEN msg['start_flag'] = 'true' THEN 1 ELSE 0 END AS start_flag " +
               "FROM " +
               " kafka_source";
       // 从表中查数据
       Table play = tableEnv.sql(execSql);
       // 将 table 转化为 stream
       DataStream<Row> playStreaming = tableEnv.toAppendStream(play, Row.class);
       // 输出 stream 中的数据
       playStreaming.print();
        // 执行流程,并给这个Flink 程序起名为 play table
       env.execute("play table");
   }
}

在这个程序中,我们通过

SELECT
   CASE WHEN type = 'A' THEN type = 'B' END AS type,
   CAST(ts AS DECIMAL) AS ts,
   msg['userId'] AS uid,
   CASE WHEN msg['id'] LIKE 'sv%' THEN msg['id'] END AS vid,
   CASE WHEN msg['start_flag'] = 'true' THEN 1 ELSE 0 END AS start_flag
FROM
   kafka_source

来转换数据。

运行上面的类,并在 Kafka 中的 test topic 中输入:

{"type":"A", "ts":"1502323401603", "msg":{"userId":"123456", "id":"sv123", "start_flag":"true"} }

我们就可以在 IDE 中看到输出结果为:

B,1502323401603,123456,sv123,1

结果和我们预期的一样,然后我们只需要将执行结果转换为对应的 JSON 就好了。

从上面的结果可以看出,Flink 解决了我们在使用 Storm 中处理一些特定数据清洗需求的痛点。每一个事件的清理需求都可以通过一个 SQL 进行描述。

如果用 Storm 来处理的话,我们会新建几个 Bolts,并在 Bolts 中对数据进行处理。当然还可以从其中总结出一些通用的规则,并通过读取特定的配置文件,然后利用相应的规则处理。但是总有些特殊的数据,例如数据 A 中 id,如果它的值以 sv 开头,就将 id 命名为 vid,如果以 ad 开头,就将 id 命名为 aid。这类需求我们只能通过不断添加规则或者硬编码来解决。随着特定需求越来越多,代码也越来越难维护。

忆苦思甜,看来 Flink 值得期待。

总而言之

本文简要介绍了下 Flink 的特点,并通过一个案例展示如何通过 Flink SQL 处理数据。和 Storm 相比较,Flink 使用起来更加方便,同时用户在编码的时候可以更加专注业务逻辑。在以后实时流工具的使用方面,用户可以直接通过 SQL 对数据进行 ETL,大大降低了工具的使用门槛。Flink 也在向这方面发展。当然 Flink 还是存在一些问题的,它的一些 API 使用不太方便,并且 Table API & SQL 还处在 Beta 阶段。如果读者想对 Flink 有更多的了解,可以查看官方文档[1]。Flink 在很多方面和 Spark Streaming 更加类似,后续我们也会将它和 Spark Streaming 做比较,而且也会继续深入研究 Flink。

参考

[1] https://flink.apache.org

张家利

百姓网数据工程师,致力于数据技术架构和系统方面的研发,主要技术方向包括实时数据流处理、Hadoop 生态系统、MPP 等。

本文仅为作者个人观点,不代表百姓网立场。

(题图来源:Flink Logo)


  每周推送一篇技术文章,
  长按二维码立即关注!

以上是关于[第8期] 流式计算之 Storm 爱用者初尝 Flink的主要内容,如果未能解决你的问题,请参考以下文章

走进大数据之storm流式计算

大数据入门第十六天——流式计算之storm详解常用命令

Strom流式计算

大数据实时流式计算引擎 Flink 简单剖析

storm 流式计算框架

Storm简介——实时流式计算介绍