大数据架构

Posted 阳光-星辰大海

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据架构相关的知识,希望对你有一定的参考价值。

Lambda架构

架构中含有离线处理与实时处理两条链路,两条链路处理数据导致数据不一致等

Kappa 架构

Kappa架构真正的实时数仓,目前在业界最常用实现就是Flink + Kafka

Kappa存在问题

  • Kafka无法支持海量数据存储。对于海量数据量的业务线来说,Kafka一般只能存储非常短时间的数据,比如最近一周,甚至最近一天。
  • Kafka无法支持高效的OLAP查询,大多数业务都希望能在DWD\\DWS层支持即席查询的,但是Kafka无法非常友好地支持这样的需求。
  • 无法复用目前已经非常成熟的基于离线数仓的数据血缘、数据质量管理体系。需要重新实现一套数据血缘、数据质量管理体系。
  • Kafka不支持update/upsert,目前Kafka仅支持append。实际场景中在DWS轻度汇聚层很多时候是需要更新的,DWD明细层到DWS轻度汇聚层一般会根据时间粒度以及维度进行一定的聚合,用于减少数据量,提升查询性能。假如原始数据是秒级数据,聚合窗口是1分钟,那就有可能产生某些延迟的数据经过时间窗口聚合之后需要更新之前数据的需求。这部分更新需求无法使用Kafka实现。

实时数仓发展到现在的架构,一定程度上解决了数据报表时效性问题,但是这样的架构依然存在不少问题,Kappa架构除了以上所说的问题之外,实时业务需求多的公司在选择Kappa架构后,也避免不了一些离线数据统一计算的场景,针对Kappa架构往往需要再针对某层Kafka数据重新编写实时程序进行统一计算,非常不方便。数据湖技术的出现,使Kappa架构实现批量数据和实时数据统一计算成为可能,“批流一体”,在业界中很多人认为批和流在开发层面上都统一到相同的SQL上处理是批流一体,也有一些人认为在计算引擎层面上批和流可以集成在同一个计算引擎是批流一体,比如:Spark/SparkStreaming/Structured Streaming/Flink框架在计算引擎层面上实现了批处理和流处理集成。除此之外,批流一体还有一个最核心的方面就是存储层面上的统一。数据湖技术可以实现将批数据和实时数据统一存储,统一处理计算。我们可以将离线数仓中的数仓和实时数仓中的数仓数据存储统一合并到数据湖上,可以将Kappa架构中的数仓分层Kafka存储替换成数据湖技术存储,这样做到“湖仓一体”的构建批流一体的方式:1、SQL统一一体;2、计算引擎统一到一体

数据湖架构

湖仓一体”架构构建也是目前各大公司针对离线场景和实时场景统一处理计算的方式。例如:一些大型公司使用Iceberg作为存储,那么Kappa架构中很多问题都可以得到解决,Kappa架构将变成个如下模样:

 无论是流处理还是批处理,数据存储都统一到数据湖Iceberg上,这一套结构将存储统一后,解决了Kappa架构很多痛点,解决方面如下:

  1. 可以解决Kafka存储数据量少的问题。目前所有数据湖基本思路都是基于HDFS之上实现的一个文件管理系统,所以数据体量可以很大。
  2. DW层数据依然可以支持OLAP查询。同样数据湖基于HDFS之上实现,只需要当前的OLAP查询引擎做一些适配就可以进行OLAP查询。
  3. 批流存储都基于Iceberg/HDFS存储之后,就完全可以复用一套相同的数据血缘、数据质量管理体系。
  4. 实时数据的更新。

上述架构也可以认为是Kappa架构的变种,也有两条数据链路,一条是基于Spark的离线数据链路,一条是基于Flink的实时数据链路,通常数据都是直接走实时链路处理,而离线链路则更多的应用于数据修正等非常规场景。这样的架构要成为一个可以落地的实时数仓方案、可以做到实时报表产生。

某公司商业场景下的实时数仓架构

项目中的数据来源有两类,一是mysql业务库数据,另一类是用户日志数据,我们通过对应的方式将两类数据首先采集到Kafka各自topic中,通过Flink处理将业务和日志数据存储在Iceberg-ODS层中,由于目前Flink基于Iceberg处理实时数据不能很好保存数据消费位置信息,所以这里同时将数据存储在Kafka中,利用Flink消费Kafka数据自动维护offset的特性来保证程序停止重启后消费数据的正确性。

整个架构是基于Iceberg构建数据仓库分层,经过Kafka处理数据都实时存储在对应的Iceberg分层中,实时数据结果经过最后分析存储在Clickhouse中,离线数据分析结果直接从Iceberg-DWS层中获取数据分析,分析结果存入MySQL中,Iceberg其它层供临时性业务分析,最终Clickhouse和MySQL中的结果通过可视化工具展示出来。

数据库同步工具:Cannel\\Maxwell\\FlinkCDC 

Cannel和Maxwell的对比:​​​​​​Maxwell与Canal_Allenzyg的博客-CSDN博客_maxwell和canalmaxwell/canal 对比_刘狗的博客-CSDN博客_maxwell和canal

FlinkCDC实践:Flink CDC 原理、实践和优化 - 简书

 环境构建:

Iceberg就是一种表格式,支持使用Hive对Iceberg进行读写操作,但是对Hive的版本有要求,如下:

集成Iceberg的方法

1、下载iceberg-hive-runtime.jar

想要使用Hive支持查询Iceberg表,首先需要下载“iceberg-hive-runtime.jar”,Hive通过该Jar可以加载Hive或者更新Iceberg表元数据信息。下载地址:https://iceberg.apache.org/#releases/:

将以上jar包下载后,上传到Hive服务端和客户端对应的HIVE_HOME/lib目录下,另外在向Hive中Iceberg格式表插入数据时需要到“libfb303-0.9.3.jar”包,将此包也上传到Hive服务端和客户端对应的HIVE_HOME/lib目录下。

2、配置hive-site.xml

在Hive客户端$HIVE_HOME/conf/hive-site.xml中追加如下配置:

<property>
    <name>iceberg.engine.hive.enabled</name>
    <value>true</value>
</property>

3、Hive中操作Ice 

 从Hive引擎的角度来看,在运行环境中有Catalog概念(catalog主要描述了数据集的位置信息,就是元数据),Hive与Iceberg整合时,Iceberg支持多种不同的Catalog类型,例如:Hive、Hadoop、第三方厂商的AWS Glue和自定义Catalog。在实际应用场景中,Hive可能使用上述任意Catalog,甚至跨不同Catalog类型join数据,为此Hive提供了org.apache.iceberg.mr.hive.HiveIcebergStorageHandler(位于包iceberg-hive-runtime.jar)来支持读写Iceberg表,并通过在Hive中设置“iceberg.catalog.<catalog_name>.type”属性来决定加载Iceberg表的方式,该属性可以配置:hive、hadoop,其中“<catalog_name>”是自己随便定义的名称,主要是在hive中创建Iceberg格式表时配置iceberg.catalog属性使用。

在Hive中创建Iceberg格式表时,根据创建Iceberg格式表时是否指定iceberg.catalog属性值,有以下三种方式决定Iceberg格式表如何加载(数据存储在什么位置)。

如果没有设置iceberg.catalog属性,默认使用HiveCatalog来加载

这种方式就是说如果在Hive中创建Iceberg格式表时,不指定iceberg.catalog属性,那么数据存储在对应的hive warehouse路径下。

在Hive客户端node3节点进入Hive,操作如下:

#在Hive中创建iceberg格式表 create table test_iceberg_tbl1( id int , name string, age int) partitioned by (dt string) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'; #在Hive中加载如下两个包,在向Hive中插入数据时执行MR程序时需要使用到 hive> add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar; hive> add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar; #向表中插入数据 hive> insert into test_iceberg_tbl1 values (1,"zs",18,"20211212"); #查询表中的数据 hive> select * from test_iceberg_tbl1; OK 1 zs 18 20211212

在Hive默认的warehouse目录下可以看到创建的表目录: 

如果设置了iceberg.catalog对应的catalog名字,就用对应类型的catalog加载

这种情况就是说在Hive中创建Iceberg格式表时,如果指定了iceberg.catalog属性值,那么数据存储在指定的catalog名称对应配置的目录下。

在Hive客户端node3节点进入Hive,操作如下:

#注册一个HiveCatalog叫another_hive hive> set iceberg.catalog.another_hive.type=hive; #在Hive中创建iceberg格式表 create table test_iceberg_tbl2( id int, name string, age int ) partitioned by (dt string) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' tblproperties ('iceberg.catalog'='another_hive'); #在Hive中加载如下两个包,在向Hive中插入数据时执行MR程序时需要使用到 hive> add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar; hive> add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar; #插入数据,并查询 hive> insert into test_iceberg_tbl2 values (2,"ls",20,"20211212"); hive> select * from test_iceberg_tbl2; OK 2 ls 20 20211212

 以上方式指定“iceberg.catalog. another_hive .type=hive”后,实际上就是使用的hive的catalog,这种方式与第一种方式不设置效果一样,创建后的表存储在hive默认的warehouse目录下。也可以在建表时指定location 写上路径,将数据存储在自定义对应路径上。

除了可以将catalog类型指定成hive之外,还可以指定成hadoop,在Hive中创建对应的iceberg格式表时需要指定location来指定iceberg数据存储的具体位置,这个位置是具有一定格式规范的自定义路径。在Hive客户端node3节点进入Hive,操作如下:

#注册一个HadoopCatalog叫hadoop hive> set iceberg.catalog.hadoop.type=hadoop; #使用HadoopCatalog时,必须设置“iceberg.catalog.<catalog_name>.warehouse”指定warehouse路径 hive> set iceberg.catalog.hadoop.warehouse=hdfs://mycluster/iceberg_data; #在Hive中创建iceberg格式表,这里创建成外表 create external table test_iceberg_tbl3( id int, name string, age int ) partitioned by (dt string) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' location 'hdfs://mycluster/iceberg_data/default/test_iceberg_tbl3' tblproperties ('iceberg.catalog'='hadoop'); 注意:以上location指定的路径必须是“iceberg.catalog.hadoop.warehouse”指定路径的子路径,格式必须是$iceberg.catalog.hadoop.warehouse/$当前建表使用的hive库/$创建的当前iceberg表名 #在Hive中加载如下两个包,在向Hive中插入数据时执行MR程序时需要使用到 hive> add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar; hive> add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar; #插入数据,并查询 hive> insert into test_iceberg_tbl3 values (3,"ww",20,"20211213"); hive> select * from test_iceberg_tbl3; OK 3 ww 20 20211213

在指定的“iceberg.catalog. hadoop .warehouse”路径下可以看到创建的表目录: 

如果iceberg.catalog属性设置为“location_based_table”,可以从指定的根路径下加载Iceberg 表

这种情况就是说如果HDFS中已经存在iceberg格式表,我们可以通过在Hive中创建Icerberg格式表指定对应的location路径映射数据。在Hive客户端中操作如下:

CREATE TABLE test_iceberg_tbl4 ( id int, name string, age int, dt string )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/spark/person' TBLPROPERTIES ('iceberg.catalog'='location_based_table'); 注意:指定的location路径下必须是iceberg格式表数据,并且需要有元数据目录才可以。不能将其他数据映射到Hive iceberg格式表。

 注意:由于Hive建表语句分区语法“Partitioned by”的限制,如果使用Hive创建Iceberg格式表,目前只能按照Hive语法来写,底层转换成Iceberg标识分区,这种情况下不能使用Iceberge的分区转换,例如:days(timestamp),如果想要使用Iceberg格式表的分区转换标识分区,需要使用Spark或者Flink引擎创建表。

Phoenix用SQL方式操作Hbase

Maxwell的部署和使用

1、开启MySQL的Binlog

mysql -u root -p123456
mysql> show variables like 'log_%';

2、在/etc/my.cnf文件中[mysqld]下写入以下内容:

[mysqld] # 随机指定一个不能和其他集群中机器重名的字符串,配置 MySQL replaction 需要定义 server-id=123 #配置binlog日志目录,配置后会自动开启binlog日志,并写入该目录 log-bin=/var/lib/mysql/mysql-bin # 选择 ROW 模式 binlog-format=ROW

 3、启动MySQL服务,重新查看binlog日志情况

[root@node2 ~]# service mysqld restart [root@node2 ~]# mysql -u root -p123456 mysql> show variables like 'log_%';

 Maxwell的安装和配置

maxwell安装版本选择1.28.2,选择node3节点安装,安装maxwell步骤如下

1、下载maxwell安装包上传node3并解压

[root@node3 ~]# cd /software/ 
[root@node3 software]# tar -zxvf ./maxwell-1.28.2.tar.gz

2、在MySQL中创建Maxwell的用户授权

mysql> CREATE database maxwell; 
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'; 
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%'; 
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;

3、修改peizhiconfig.properties文件

node3节点进入“/software/maxwell-1.28.2”,修改“config.properties.example”为“config.properties”并配置:

producer=kafka 
kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092
kafka_topic=test-topic #设置根据表将binlog写入Kafka不同分区,还可指定:[database, table, primary_key, transaction_id, thread_id, column] 
producer_partition_by=table 
#mysql 节点 
host=node2
#连接mysql用户名和密码
user=maxwell 
password=maxwell #指定maxwell 当前连接mysql的实例id,这里用于全量同步表数据使用 client_id=maxwell_first
注意:以上参数也可以在后期启动maxwell时指定参数方式来设置。

 4、启动zk及kafka创建对应的topic

[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic test-topic --partitions 3 --replication-factor 3

5、kafka中检测test-topic

[root@node2 bin]# cd /software/kafka_2.11-0.11/ 
[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test-topic

6、启动Maxwell

[root@node3 ~]# cd /software/maxwell-1.28.2/bin
[root@node3 bin]# maxwell --config ../config.properties.

#startMaxwell.sh 脚本内容: /software/maxwell-1.28.2/bin/maxwell --config /software/maxwell-1.28.2/config.properties > ./log.txt 2>&1 &

chmod +x ./start_maxwell.sh

注意:这里我们可以通过Maxwell将MySQL业务库中所有binlog变化数据监控到Kafka test-topic中,在此项目中我们将MySQL binlog数据监控到Kafka中然后通过Flink读取对应topic数据进行处理。

7、在MySQL中创建库testdb,并创建表person插入数据

mysql> create database testdb; mysql> use testdb; 
mysql> create table person(id int,name varchar(255),age int); 
mysql> insert into person values (1,'zs',18);
mysql> insert into person values (2,'ls',19); 
mysql> insert into person values (3,'ww',20);

 可以看到在监控的kafka test-topic中有对应的数据被同步到topic中:

8、全量数据从MySQL同步到kafka

这里以MySQL 表testdb.person为例将全量数据导入到Kafka中,可以通过配置Maxwell,使用Maxwell bootstrap功能全量将已经存在MySQL testdb.person表中的数据导入到Kafka,操作步骤如下:

#启动Maxwell 
[root@node3 ~]# cd /software/maxwell-1.28.2/bin 
[root@node3 bin]# maxwell --config ../config.properties #启动maxwell-bootstrap全量同步数据 [root@node3 ~]# cd /software/maxwell-1.28.2/bin 
[root@node3 bin]# ./maxwell-bootstrap --database testdb --table person --host node2 --user maxwell --password maxwell --client_id maxwell_first --where "id>0"

CK的安装和配置

配置clickhouse的集群名称,可自由定义名称,注意集群名称中不能包含点号。这里代表集群中有3个分片,每个分片有1个副本。

分片是指包含部分数据的服务器,要读取所有的数据,必须访问所有的分片。

副本是指存储分片备份数据的服务器,要读取所有的数据,访问任意副本上的数据即可。

Shard:分片,一个clickhouse集群可以分多个分片,每个分片可以存储数据,这里 分片可以理解为clickhouse机器中的每个节点,1个分片只能对应1服务节点 。这里可以配置一个或者任意多个分片,在每个分片中可以配置一个或任意多个副本,不同分片可配置不同数量的副本。如果只是配置一个分片,这种情况下查询操作应该称为远程查询,而不是分布式查询。

Replica:副本,每个分片的副本,默认每个分片配置了一个副本。也可以配置多个,副本的数量上限是由clickhouse节点的数量决定的。如果配置了副本,读取操作可以从每个分片里选择一个可用的副本。如果副本不可用,会依次选择下个副本进行连接。该机制利于系统的可用性。

internal_replication:默认为false,写数据操作会将数据写入所有的副本,设置为true,写操作只会选择一个正常的副本写入数据,数据的同步在后台自动进行。

日志采集方案

当用户浏览网站触发对应的接口时,日志采集接口根据配合的log4j将用户浏览信息写入对应的目录中,然后通过Flume监控对应的日志目录,将用户日志数据采集到Kafka topic “KAFKA-USER-LOG-DATA”中。

#设置source名称
a.sources = r1
#设置channel的名称
a.channels = c1
#设置sink的名称
a.sinks = k1

# For each one of the sources, the type is defined
#设置source类型为TAILDIR,监控目录下的文件
#Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题
a.sources.r1.type = TAILDIR
#文件的组,可以定义多种
a.sources.r1.filegroups = f1
#第一组监控的是对应文件夹中的什么文件:.log文件
a.sources.r1.filegroups.f1 = /software/lakehouselogs/userbrowse/.*log

# The channel can be defined as follows.
#设置source的channel名称
a.sources.r1.channels = c1
a.sources.r1.max-line-length = 1000000
#a.sources.r1.eventSize = 512000000

# Each channel's type is defined.
#设置channel的类型
a.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
#设置channel道中最大可以存储的event数量
a.channels.c1.capacity = 1000
#每次最大从source获取或者发送到sink中的数据量
a.channels.c1.transcationCapacity=100

# Each sink's type must be defined
#设置Kafka接收器
a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a.sinks.k1.brokerList=node1:9092,node2:9092,node3:9092
#设置Kafka的Topic
a.sinks.k1.topic=KAFKA-USER-LOG-DATA
#设置序列化方式
a.sinks.k1.serializer.class=kafka.serializer.StringEncoder 
#Specify the channel the sink should use
#设置sink的channel名称
a.sinks.k1.channel = c1
在Kafka中创建对应的topic并监控
#进入Kafka路径,创建对应topic
[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3

#监控Kafak topic 中的数据
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-USER-LOG-DATA

创建Iceberg-ODS层表

代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:在Hive中添加Iceberg表格式需要的包,启动HDFS集群,node1启动Hive metastore服务,在Hive客户端启动Hive添加Iceberg依赖包:

#在hive客户端node3节点加载两个jar包 add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar; add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

创建Iceberg表:这里创建Iceberg表有“ODS_MEMBER_INFO”、“ODS_MEMBER_ADDRESS”、“ODS_USER_LOGIN”,创建语句如下:

#在Hive客户端执行以下建表语句
CREATE TABLE ODS_MEMBER_INFO (
id string, 
user_id string,
member_growth_score string,
member_level string, 
balance string, 
gmt_create string, 
gmt_modified string
 )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_INFO/' TBLPROPERTIES ('iceberg.catalog'='location_based_table'); CREATE TABLE ODS_MEMBER_ADDRESS ( id string, user_id string, province string, city string, area string, address string, log string, lat string, phone_number string, consignee_name string, gmt_create string, gmt_modified string )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_ADDRESS/' TBLPROPERTIES ('iceberg.catalog'='location_based_table'); CREATE TABLE ODS_USER_LOGIN ( id string, user_id string, ip string, login_tm string, logout_tm string )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_USER_LOGIN/' TBLPROPERTIES ('iceberg.catalog'='location_based_table');

 代码测试,在Kafka中创建对应的Topic

#在Kafka 中创建 KAFKA-DWS-USER-LOGIN-WIDE-TOPIC topic ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWS-USER-LOGIN-WIDE-TOPIC --partitions 3 --replication-factor 3 #监控以上topic数据 [root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWS-USER-LOGIN-WIDE-TOPIC

将代码中消费Kafka数据改成从头开始消费

代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码“RTMockDBData.java”代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。

执行代码,查看对应的结果

以上代码执行后在,在对应的Kafka “KAFKA-DWS-USER-LOGIN-WIDE-TOPIC” topic中都有对应的数据。在Iceberg-DWD层中对应的表中也有数据。Kafka中结果如下:

 Iceberg-DWD层表”DWS_USER_LOGIN”中的数据如下:

 编写写入DM层业务代码

DM层主要是报表数据,针对实时业务将DM层设置在Clickhouse中,在此业务中DM层主要存储的是通过Flink读取Kafka “KAFKA-DWS-USER-LOGIN-WIDE-TOPIC” topic中的数据进行分析的结果,实时写入到Clickhouse中。

object ProcessUserLoginInfoToDM  def main(args: Array[String]): Unit =  //1.准备环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000) import org.apache.flink.streaming.api.scala._ /** * 2.创建 Kafka Connector,连接消费Kafka dwd中数据 *  * "gmt_create": "1645019077786", * "area": "淮阴区", * "address": "江苏省淮安市淮阴区渔沟镇淮西村", * "city": "淮安市", * "ip": "141.252.65.108", * "consignee_name": "苗优奇", * "gmt_modified": "1645019077786", * "member_level": "2", * "balance": "58444", * "province": "江苏省", * "user_id": "uid534024", * "member_points": "5700", * "phone_number": "17866060116", * "logout_tm": "2022-03-08 12:31:12", * "member_growth_score": "9832", * "login_tm": "2022-03-08 11:48:09" *  */ tblEnv.executeSql( """ |create table kafka_dws_user_login_wide_tbl ( | user_id string, | ip string, | gmt_create string, | login_tm string, | logout_tm string, | member_level string, | province string, | city string, | area string, | address string, | member_points string, | member_growth_score string |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DWS-USER-LOGIN-WIDE-TOPIC', | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin) /** * 3.实时统计每个省份新增你会员数量及每个省份pv,uv * now() == current_timestamp 返回时间戳 timestamp 格式日期:2022-03-15T06:20:51.788 */ val dwsTbl:Table = tblEnv.sqlQuery( """ | select province,city,user_id,login_tm,gmt_create from kafka_dws_user_login_wide_tbl """.stripMargin) //4.将Row 类型数据转换成对象类型操作 val dwsDS: DataStream[UserLoginWideInfo] = tblEnv.toAppendStream[Row](dwsTbl) .filter(row=>row.getField(0)!=null) .map(row =>  val province: String = row.getField(0).toString val city: String = row.getField(1).toString val user_id: String = row.getField(2).toString val login_tm: String = row.getField(3).toString val gmt_create: String = row.getField(4).toString UserLoginWideInfo(user_id, null, DateUtil.getDateYYYYMMDDHHMMSS(gmt_create), login_tm, null, null, province, city, null, null, null, null, null) ) /** * 5.将以上结果写入到Clickhouse表 dm_user_login_info 表中 * create table dm_user_login_info( * dt String, * province String, * city String, * user_id String, * login_tm String, * gmt_create String * ) engine = MergeTree() order by dt; */ //准备向ClickHouse中插入数据的sql val insertIntoCkSql = "insert into dm_user_login_info (dt,province,city,user_id,login_tm,gmt_create) values (?,?,?,?,?,?)" val ckSink: SinkFunction[UserLoginWideInfo] = MyClickHouseUtil.clickhouseSink[UserLoginWideInfo](insertIntoCkSql, new JdbcStatementBuilder[UserLoginWideInfo]  override def accept(ps: PreparedStatement, userLoginWideInfo: UserLoginWideInfo): Unit =  ps.setString(1, DateUtil.getCurrentDateYYYYMMDD()) ps.setString(2, userLoginWideInfo.province) ps.setString(3, userLoginWideInfo.city) ps.setString(4, userLoginWideInfo.user_id) ps.setString(5, userLoginWideInfo.login_tm) ps.setString(6, userLoginWideInfo.gmt_create)  ) //6.针对数据加入sink dwsDS.addSink(ckSink) env.execute()  
创建Clickhouse-DM层表
代码在执行之前需要在Clickhouse中创建对应的DM层用户登录信息表dm_user_login_info,clickhouse建表语句如下:
#node1节点启动clickhouse [root@node1 bin]# service clickhouse-server start #node1节点进入clickhouse [root@node1 bin]# clickhouse-client -m #node1节点创建clickhouse-DM层表 create table dm_user_login_info( dt String, province String, city String, user_id String, login_tm String, gmt_create String ) engine = MergeTree() order by dt;

代码 

 将代码中消费Kafka数据改成从头开始消费

代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。

这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码“RTMockDBData.java”代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。

执行代码,查看对应结果

以上代码执行后在,在Clickhouse-DM层中表“dm_user_login_info”中查看对应数据结果如下:

数据发布接口

通过Flink实时把结果数据写入Clickhouse-DM层中后,我们需要编写数据发布接口方便数据使用方调用数据结果进行可视化,数据发布接口项目为SpringBoot项目“LakeHouseDataPublish”,此Springboot接口支持mysql数据源与clickhouse数据源,mysql数据源方便离线数据展示,clickhouse数据源主要展示DM层实时结果数据。

此业务对应的接口为”localhost:8989/lakehouse/dataapi/getUserLoginInfos”,在window本地启动数据发布接口,启动之后浏览器输入以上接口即可查询对应数据结果。

五种大数据处理架构

五种大数据处理架构大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存

参考技术A 五种大数据处理架构
大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性、规模,以及价值在最近几年才经历了大规模扩展。
本文将介绍大数据系统一个最基本的组件:处理框架。处理框架负责对系统中的数据进行计算,例如处理从非易失存储中读取的数据,或处理刚刚摄入到系统中的数据。数据的计算则是指从大量单一数据点中提取信息和见解的过程。
下文将介绍这些框架:
· 仅批处理框架:
Apache Hadoop
· 仅流处理框架:
Apache Storm
Apache Samza
· 混合框架:
Apache Spark
Apache Flink
大数据处理框架是什么?
处理框架和处理引擎负责对数据系统中的数据进行计算。虽然“引擎”和“框架”之间的区别没有什么权威的定义,但大部分时候可以将前者定义为实际负责处理数据操作的组件,后者则可定义为承担类似作用的一系列组件。
例如Apache Hadoop可以看作一种以MapReduce作为默认处理引擎的处理框架。引擎和框架通常可以相互替换或同时使用。例如另一个框架Apache Spark可以纳入Hadoop并取代MapReduce。组件之间的这种互操作性是大数据系统灵活性如此之高的原因之一。
虽然负责处理生命周期内这一阶段数据的系统通常都很复杂,但从广义层面来看它们的目标是非常一致的:通过对数据执行操作提高理解能力,揭示出数据蕴含的模式,并针对复杂互动获得见解。
为了简化这些组件的讨论,我们会通过不同处理框架的设计意图,按照所处理的数据状态对其进行分类。一些系统可以用批处理方式处理数据,一些系统可以用流方式处理连续不断流入系统的数据。此外还有一些系统可以同时处理这两类数据。
在深入介绍不同实现的指标和结论之前,首先需要对不同处理类型的概念进行一个简单的介绍。
批处理系统
批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。
批处理模式中使用的数据集通常符合下列特征…
· 有界:批处理数据集代表数据的有限集合
· 持久:数据通常始终存储在某种类型的持久存储位置中
· 大量:批处理操作通常是处理极为海量数据集的唯一方法
批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。
需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。
大量数据的处理需要付出大量时间,因此批处理不适合对处理时间要求较高的场合。
Apache Hadoop
Apache Hadoop是一种专用于批处理的处理框架。Hadoop是首个在开源社区获得极大关注的大数据框架。基于谷歌有关海量数据处理所发表的多篇论文与经验的Hadoop重新实现了相关算法和组件堆栈,让大规模批处理技术变得更易用。
新版Hadoop包含多个组件,即多个层,通过配合使用可处理批数据:
· HDFS:HDFS是一种分布式文件系统层,可对集群节点间的存储和复制进行协调。HDFS确保了无法避免的节点故障发生后数据依然可用,可将其用作数据来源,可用于存储中间态的处理结果,并可存储计算的最终结果。
· YARN:YARN是Yet Another Resource Negotiator(另一个资源管理器)的缩写,可充当Hadoop堆栈的集群协调组件。该组件负责协调并管理底层资源和调度作业的运行。通过充当集群资源的接口,YARN使得用户能在Hadoop集群中使用比以往的迭代方式运行更多类型的工作负载。
· MapReduce:MapReduce是Hadoop的原生批处理引擎。
批处理模式
Hadoop的处理功能来自MapReduce引擎。MapReduce的处理技术符合使用键值对的map、shuffle、reduce算法要求。基本处理过程包括:
· 从HDFS文件系统读取数据集
· 将数据集拆分成小块并分配给所有可用节点
· 针对每个节点上的数据子集进行计算(计算的中间态结果会重新写入HDFS)
· 重新分配中间态结果并按照键进行分组
· 通过对每个节点计算的结果进行汇总和组合对每个键的值进行“Reducing”
· 将计算而来的最终结果重新写入 HDFS
优势和局限
由于这种方法严重依赖持久存储,每个任务需要多次执行读取和写入操作,因此速度相对较慢。但另一方面由于磁盘空间通常是服务器上最丰富的资源,这意味着MapReduce可以处理非常海量的数据集。同时也意味着相比其他类似技术,Hadoop的MapReduce通常可以在廉价硬件上运行,因为该技术并不需要将一切都存储在内存中。MapReduce具备极高的缩放潜力,生产环境中曾经出现过包含数万个节点的应用。
MapReduce的学习曲线较为陡峭,虽然Hadoop生态系统的其他周边技术可以大幅降低这一问题的影响,但通过Hadoop集群快速实现某些应用时依然需要注意这个问题。
围绕Hadoop已经形成了辽阔的生态系统,Hadoop集群本身也经常被用作其他软件的组成部件。很多其他处理框架和引擎通过与Hadoop集成也可以使用HDFS和YARN资源管理器。
总结
Apache Hadoop及其MapReduce处理引擎提供了一套久经考验的批处理模型,最适合处理对时间要求不高的非常大规模数据集。通过非常低成本的组件即可搭建完整功能的Hadoop集群,使得这一廉价且高效的处理技术可以灵活应用在很多案例中。与其他框架和引擎的兼容与集成能力使得Hadoop可以成为使用不同技术的多种工作负载处理平台的底层基础。
流处理系统
流处理系统会对随时进入系统的数据进行计算。相比批处理模式,这是一种截然不同的处理方式。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作。
· 流处理中的数据集是“无边界”的,这就产生了几个重要的影响:
· 完整数据集只能代表截至目前已经进入到系统中的数据总量。
· 工作数据集也许更相关,在特定时间只能代表某个单一数据项。
处理工作是基于事件的,除非明确停止否则没有“尽头”。处理结果立刻可用,并会随着新数据的抵达继续更新。
流处理系统可以处理几乎无限量的数据,但同一时间只能处理一条(真正的流处理)或很少量(微批处理,Micro-batch Processing)数据,不同记录间只维持最少量的状态。虽然大部分系统提供了用于维持某些状态的方法,但流处理主要针对副作用更少,更加功能性的处理(Functional processing)进行优化。
功能性操作主要侧重于状态或副作用有限的离散步骤。针对同一个数据执行同一个操作会或略其他因素产生相同的结果,此类处理非常适合流处理,因为不同项的状态通常是某些困难、限制,以及某些情况下不需要的结果的结合体。因此虽然某些类型的状态管理通常是可行的,但这些框架通常在不具备状态管理机制时更简单也更高效。
此类处理非常适合某些类型的工作负载。有近实时处理需求的任务很适合使用流处理模式。分析、服务器或应用程序错误日志,以及其他基于时间的衡量指标是最适合的类型,因为对这些领域的数据变化做出响应对于业务职能来说是极为关键的。流处理很适合用来处理必须对变动或峰值做出响应,并且关注一段时间内变化趋势的数据。
Apache Storm
Apache Storm是一种侧重于极低延迟的流处理框架,也许是要求近实时处理的工作负载的最佳选择。该技术可处理非常大量的数据,通过比其他解决方案更低的延迟提供结果。
流处理模式
Storm的流处理可对框架中名为Topology(拓扑)的DAG(Directed Acyclic Graph,有向无环图)进行编排。这些拓扑描述了当数据片段进入系统后,需要对每个传入的片段执行的不同转换或步骤。
拓扑包含:
· Stream:普通的数据流,这是一种会持续抵达系统的无边界数据。
· Spout:位于拓扑边缘的数据流来源,例如可以是API或查询等,从这里可以产生待处理的数据。
· Bolt:Bolt代表需要消耗流数据,对其应用操作,并将结果以流的形式进行输出的处理步骤。Bolt需要与每个Spout建立连接,随后相互连接以组成所有必要的处理。在拓扑的尾部,可以使用最终的Bolt输出作为相互连接的其他系统的输入。
Storm背后的想法是使用上述组件定义大量小型的离散操作,随后将多个组件组成所需拓扑。默认情况下Storm提供了“至少一次”的处理保证,这意味着可以确保每条消息至少可以被处理一次,但某些情况下如果遇到失败可能会处理多次。Storm无法确保可以按照特定顺序处理消息。
为了实现严格的一次处理,即有状态处理,可以使用一种名为Trident的抽象。严格来说不使用Trident的Storm通常可称之为Core Storm。Trident会对Storm的处理能力产生极大影响,会增加延迟,为处理提供状态,使用微批模式代替逐项处理的纯粹流处理模式。
为避免这些问题,通常建议Storm用户尽可能使用Core Storm。然而也要注意,Trident对内容严格的一次处理保证在某些情况下也比较有用,例如系统无法智能地处理重复消息时。如果需要在项之间维持状态,例如想要计算一个小时内有多少用户点击了某个链接,此时Trident将是你唯一的选择。尽管不能充分发挥框架与生俱来的优势,但Trident提高了Storm的灵活性。
Trident拓扑包含:
· 流批(Stream batch):这是指流数据的微批,可通过分块提供批处理语义。
· 操作(Operation):是指可以对数据执行的批处理过程。
优势和局限
目前来说Storm可能是近实时处理领域的最佳解决方案。该技术可以用极低延迟处理数据,可用于希望获得最低延迟的工作负载。如果处理速度直接影响用户体验,例如需要将处理结果直接提供给访客打开的网站页面,此时Storm将会是一个很好的选择。
Storm与Trident配合使得用户可以用微批代替纯粹的流处理。虽然借此用户可以获得更大灵活性打造更符合要求的工具,但同时这种做法会削弱该技术相比其他解决方案最大的优势。话虽如此,但多一种流处理方式总是好的。
Core Storm无法保证消息的处理顺序。Core Storm为消息提供了“至少一次”的处理保证,这意味着可以保证每条消息都能被处理,但也可能发生重复。Trident提供了严格的一次处理保证,可以在不同批之间提供顺序处理,但无法在一个批内部实现顺序处理。
在互操作性方面,Storm可与Hadoop的YARN资源管理器进行集成,因此可以很方便地融入现有Hadoop部署。除了支持大部分处理框架,Storm还可支持多种语言,为用户的拓扑定义提供了更多选择。
总结
对于延迟需求很高的纯粹的流处理工作负载,Storm可能是最适合的技术。该技术可以保证每条消息都被处理,可配合多种编程语言使用。由于Storm无法进行批处理,如果需要这些能力可能还需要使用其他软件。如果对严格的一次处理保证有比较高的要求,此时可考虑使用Trident。不过这种情况下其他流处理框架也许更适合。
Apache Samza
Apache Samza是一种与Apache Kafka消息系统紧密绑定的流处理框架。虽然Kafka可用于很多流处理系统,但按照设计,Samza可以更好地发挥Kafka独特的架构优势和保障。该技术可通过Kafka提供容错、缓冲,以及状态存储。
Samza可使用YARN作为资源管理器。这意味着默认情况下需要具备Hadoop集群(至少具备HDFS和YARN),但同时也意味着Samza可以直接使用YARN丰富的内建功能。
流处理模式
Samza依赖Kafka的语义定义流的处理方式。Kafka在处理数据时涉及下列概念:
· Topic(话题):进入Kafka系统的每个数据流可称之为一个话题。话题基本上是一种可供消耗方订阅的,由相关信息组成的数据流。
· Partition(分区):为了将一个话题分散至多个节点,Kafka会将传入的消息划分为多个分区。分区的划分将基于键(Key)进行,这样可以保证包含同一个键的每条消息可以划分至同一个分区。分区的顺序可获得保证。
· Broker(代理):组成Kafka集群的每个节点也叫做代理。
· Producer(生成方):任何向Kafka话题写入数据的组件可以叫做生成方。生成方可提供将话题划分为分区所需的键。
· Consumer(消耗方):任何从Kafka读取话题的组件可叫做消耗方。消耗方需要负责维持有关自己分支的信息,这样即可在失败后知道哪些记录已经被处理过了。
由于Kafka相当于永恒不变的日志,Samza也需要处理永恒不变的数据流。这意味着任何转换创建的新数据流都可被其他组件所使用,而不会对最初的数据流产生影响。
优势和局限
乍看之下,Samza对Kafka类查询系统的依赖似乎是一种限制,然而这也可以为系统提供一些独特的保证和功能,这些内容也是其他流处理系统不具备的。
例如Kafka已经提供了可以通过低延迟方式访问的数据存储副本,此外还可以为每个数据分区提供非常易用且低成本的多订阅者模型。所有输出内容,包括中间态的结果都可写入到Kafka,并可被下游步骤独立使用。
这种对Kafka的紧密依赖在很多方面类似于MapReduce引擎对HDFS的依赖。虽然在批处理的每个计算之间对HDFS的依赖导致了一些严重的性能问题,但也避免了流处理遇到的很多其他问题。
Samza与Kafka之间紧密的关系使得处理步骤本身可以非常松散地耦合在一起。无需事先协调,即可在输出的任何步骤中增加任意数量的订阅者,对于有多个团队需要访问类似数据的组织,这一特性非常有用。多个团队可以全部订阅进入系统的数据话题,或任意订阅其他团队对数据进行过某些处理后创建的话题。这一切并不会对数据库等负载密集型基础架构造成额外的压力。
直接写入Kafka还可避免回压(Backpressure)问题。回压是指当负载峰值导致数据流入速度超过组件实时处理能力的情况,这种情况可能导致处理工作停顿并可能丢失数据。按照设计,Kafka可以将数据保存很长时间,这意味着组件可以在方便的时候继续进行处理,并可直接重启动而无需担心造成任何后果。
Samza可以使用以本地键值存储方式实现的容错检查点系统存储数据。这样Samza即可获得“至少一次”的交付保障,但面对由于数据可能多次交付造成的失败,该技术无法对汇总后状态(例如计数)提供精确恢复。
Samza提供的高级抽象使其在很多方面比Storm等系统提供的基元(Primitive)更易于配合使用。目前Samza只支持JVM语言,这意味着它在语言支持方面不如Storm灵活。
总结
对于已经具备或易于实现Hadoop和Kafka的环境,Apache Samza是流处理工作负载一个很好的选择。Samza本身很适合有多个团队需要使用(但相互之间并不一定紧密协调)不同处理阶段的多个数据流的组织。Samza可大幅简化很多流处理工作,可实现低延迟的性能。如果部署需求与当前系统不兼容,也许并不适合使用,但如果需要极低延迟的处理,或对严格的一次处理语义有较高需求,此时依然适合考虑。
混合处理系统:批处理和流处理
一些处理框架可同时处理批处理和流处理工作负载。这些框架可以用相同或相关的组件和API处理两种类型的数据,借此让不同的处理需求得以简化。
如你所见,这一特性主要是由Spark和Flink实现的,下文将介绍这两种框架。实现这样的功能重点在于两种不同处理模式如何进行统一,以及要对固定和不固定数据集之间的关系进行何种假设。
虽然侧重于某一种处理类型的项目会更好地满足具体用例的要求,但混合框架意在提供一种数据处理的通用解决方案。这种框架不仅可以提供处理数据所需的方法,而且提供了自己的集成项、库、工具,可胜任图形分析、机器学习、交互式查询等多种任务。
Apache Spark
Apache Spark是一种包含流处理能力的下一代批处理框架。与Hadoop的MapReduce引擎基于各种相同原则开发而来的Spark主要侧重于通过完善的内存计算和处理优化机制加快批处理工作负载的运行速度。
Spark可作为独立集群部署(需要相应存储层的配合),或可与Hadoop集成并取代MapReduce引擎。
批处理模式
与MapReduce不同,Spark的数据处理工作全部在内存中进行,只在一开始将数据读入内存,以及将最终结果持久存储时需要与存储层交互。所有中间态的处理结果均存储在内存中。
虽然内存中处理方式可大幅改善性能,Spark在处理与磁盘有关的任务时速度也有很大提升,因为通过提前对整个任务集进行分析可以实现更完善的整体式优化。为此Spark可创建代表所需执行的全部操作,需要操作的数据,以及操作和数据之间关系的Directed Acyclic Graph(有向无环图),即DAG,借此处理器可以对任务进行更智能的协调。
为了实现内存中批计算,Spark会使用一种名为Resilient Distributed Dataset(弹性分布式数据集),即RDD的模型来处理数据。这是一种代表数据集,只位于内存中,永恒不变的结构。针对RDD执行的操作可生成新的RDD。每个RDD可通过世系(Lineage)回溯至父级RDD,并最终回溯至磁盘上的数据。Spark可通过RDD在无需将每个操作的结果写回磁盘的前提下实现容错。
流处理模式
流处理能力是由Spark Streaming实现的。Spark本身在设计上主要面向批处理工作负载,为了弥补引擎设计和流处理工作负载特征方面的差异,Spark实现了一种叫做微批(Micro-batch)*的概念。在具体策略方面该技术可以将数据流视作一系列非常小的“批”,借此即可通过批处理引擎的原生语义进行处理。
Spark Streaming会以亚秒级增量对流进行缓冲,随后这些缓冲会作为小规模的固定数据集进行批处理。这种方式的实际效果非常好,但相比真正的流处理框架在性能方面依然存在不足。
优势和局限
使用Spark而非Hadoop MapReduce的主要原因是速度。在内存计算策略和先进的DAG调度等机制的帮助下,Spark可以用更快速度处理相同的数据集。
Spark的另一个重要优势在于多样性。该产品可作为独立集群部署,或与现有Hadoop集群集成。该产品可运行批处理和流处理,运行一个集群即可处理不同类型的任务。
除了引擎自身的能力外,围绕Spark还建立了包含各种库的生态系统,可为机器学习、交互式查询等任务提供更好的支持。相比MapReduce,Spark任务更是“众所周知”地易于编写,因此可大幅提高生产力。
为流处理系统采用批处理的方法,需要对进入系统的数据进行缓冲。缓冲机制使得该技术可以处理非常大量的传入数据,提高整体吞吐率,但等待缓冲区清空也会导致延迟增高。这意味着Spark Streaming可能不适合处理对延迟有较高要求的工作负载。
由于内存通常比磁盘空间更贵,因此相比基于磁盘的系统,Spark成本更高。然而处理速度的提升意味着可以更快速完成任务,在需要按照小时数为资源付费的环境中,这一特性通常可以抵消增加的成本。
Spark内存计算这一设计的另一个后果是,如果部署在共享的集群中可能会遇到资源不足的问题。相比HadoopMapReduce,Spark的资源消耗更大,可能会对需要在同一时间使用集群的其他任务产生影响。从本质来看,Spark更不适合与Hadoop堆栈的其他组件共存一处。
总结
Spark是多样化工作负载处理任务的最佳选择。Spark批处理能力以更高内存占用为代价提供了无与伦比的速度优势。对于重视吞吐率而非延迟的工作负载,则比较适合使用Spark Streaming作为流处理解决方案。
Apache Flink
Apache Flink是一种可以处理批处理任务的流处理框架。该技术可将批处理数据视作具备有限边界的数据流,借此将批处理任务作为流处理的子集加以处理。为所有处理任务采取流处理为先的方法会产生一系列有趣的副作用。
这种流处理为先的方法也叫做Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理作为主要处理方法,使用流作为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。
流处理模型
Flink的流处理模型在处理传入数据时会将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:
· Stream(流)是指在系统中流转的,永恒不变的无边界数据集
· Operator(操作方)是指针对数据流执行操作以产生其他数据流的功能
· Source(源)是指数据流进入系统的入口点
· Sink(槽)是指数据流离开Flink系统后进入到的位置,槽可以是数据库或到其他系统的连接器
为了在计算过程中遇到问题后能够恢复,流处理任务会在预定时间点创建快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。
此外Flink的流处理能力还可以理解“事件时间”这一概念,这是指事件实际发生的时间,此外该功能还可以处理会话。这意味着可以通过某种有趣的方式确保执行顺序和分组。
批处理模型
Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型不再从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用完全相同的运行时。
Flink可以对批处理工作负载实现一定的优化。例如由于批处理操作可通过持久存储加以支持,Flink可以不对批处理工作负载创建快照。数据依然可以恢复,但常规处理操作可以执行得更快。
另一个优化是对批处理任务进行分解,这样即可在需要的时候调用不同阶段和组件。借此Flink可以与集群的其他用户更好地共存。对任务提前进行分析使得Flink可以查看需要执行的所有操作、数据集的大小,以及下游需要执行的操作步骤,借此实现进一步的优化。
优势和局限
Flink目前是处理框架领域一个独特的技术。虽然Spark也可以执行批处理和流处理,但Spark的流处理采取的微批架构使其无法适用于很多用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。
Flink的很多组件是自行管理的。虽然这种做法较为罕见,但出于性能方面的原因,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不同,待处理数据的特征发生变化后Flink无需手工优化和调整,并且该技术也可以自行处理数据分区和自动缓存等操作。
Flink会通过多种方式对工作进行分许进而优化任务。这种分析在部分程度上类似于SQL查询规划器对关系型数据库所做的优化,可针对特定任务确定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一起。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。
在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也可以查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了类似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。
Flink能很好地与其他组件配合使用。如果配合Hadoop 堆栈使用,该技术可以很好地融入整个环境,在任何时候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还可以运行为其他处理框架,例如Hadoop和Storm编写的任务。
目前Flink最大的局限之一在于这依然是一个非常“年幼”的项目。现实环境中该项目的大规模部署尚不如其他处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深入的研究。随着快速开发周期的推进和兼容包等功能的完善,当越来越多的组织开始尝试时,可能会出现越来越多的Flink部署
总结
Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少量批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,因此可以很方便地进行评估。快速进展的开发工作使其值得被大家关注。
结论
大数据系统可使用多种处理技术。
对于仅需要批处理的工作负载,如果对时间不敏感,比其他解决方案实现成本更低的Hadoop将会是一个好选择。
对于仅需要流处理的工作负载,Storm可支持更广泛的语言并实现极低延迟的处理,但默认配置可能产生重复结果并且无法保证顺序。Samza与YARN和Kafka紧密集成可提供更大灵活性,更易用的多团队使用,以及更简单的复制和状态管理。
对于混合型工作负载,Spark可提供高速批处理和微批处理模式的流处理。该技术的支持更完善,具备各种集成库和工具,可实现灵活的集成。Flink提供了真正的流处理并具备批处理能力,通过深度优化可运行针对其他平台编写的任务,提供低延迟的处理,但实际应用方面还为时过早。
最适合的解决方案主要取决于待处理数据的状态,对处理所需时间的需求,以及希望得到的结果。具体是使用全功能解决方案或主要侧重于某种项目的解决方案,这个问题需要慎重权衡。随着逐渐成熟并被广泛接受,在评估任何新出现的创新型解决方案时都需要考虑类似的问题。

以上是关于大数据架构的主要内容,如果未能解决你的问题,请参考以下文章

Flume在企业大数据仓库架构中位置及功能

详解大数据数据仓库分层架构

大数据数据仓库架构

数据仓库系列篇——唯品会大数据架构

阿里在线技术峰会李金波:企业大数据平台仓库架构建设思路

大数据企业大数据平台的数据仓库架构大数据和人工智能的关系