flink流计算随笔
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink流计算随笔相关的知识,希望对你有一定的参考价值。
Windows
聚合事件(例如计数、和)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口 windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。
Windows可以是时间驱动(示例:每30秒)或数据驱动(示例:每100个元素)。一个典型的方法是区分不同类型的窗口,比如翻筋斗窗口(没有重叠)、滑动窗口(有重叠)和会话窗口(中间有一个不活跃的间隙)。
Time
当提到流程序中的时间(例如定义窗口)时,可以指不同的时间概念:
事件时间Event Time 是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感器或生产服务附加的时间戳。Flink通过timestamp assigners(时间戳指定人)访问事件时间戳。
摄入时间Ingestion time 是事件进入源操作符的Flink数据流的时间。
处理时间Processing Time是执行基于时间的操作的每个操作符的本地时间。
有状态操作(Stateful Operations)
虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但是一些操作记住了跨多个事件的信息(例如窗口操作符)。这些操作称为有状态操作。
有状态操作的状态被维护在可以认为是嵌入式键/值存储中。状态与有状态操作符读取的流一起被严格地分区和分布。因此,在keyBy()函数之后,只能在键控流上访问键/值状态,并且只能访问与当前事件的键相关联的值。对齐流和状态的键确保所有的状态更新都是本地操作,保证一致性而不增加事务开销。这种对齐还允许Flink透明地重新分配状态和调整流分区。
容错检查点Checkpoints for Fault Tolerance
Flink通过流回放和检查点的组合实现了容错。检查点与每个输入流中的特定点以及每个操作符的对应状态相关。通过恢复操作符的状态并从检查点重新播放事件,流数据流可以在检查点恢复,同时保持一致性(准确地说是一次处理语义)。
检查点间隔是在执行期间用恢复时间(需要重放的事件数量)来权衡容错开销的一种方法。
关于容错的内部描述提供了关于Flink如何管理检查点和相关主题的更多信息。有关启用和配置检查点的详细信息在检查点API文档中。
批处理流Batch on Streamin
Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限的元素数量)。数据集在内部被视为数据流。因此,上述概念同样适用于批处理程序,也适用于流程序,但有少数例外:
批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可能的,因为输入是有界的。这将使成本更多地用于恢复,但使常规处理更便宜,因为它避免了检查点。
数据集API中的有状态操作使用简化的内存/核心外数据结构,而不是键/值索引。
DataSet API引入了特殊的synchronized(基于超步的)迭代,这只能在有界的流上实现。
Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,过滤、更新状态、定义窗口、聚合)。数据流最初是从各种来源(例如,消息队列、套接字流、文件)创建的。结果通过sink返回,它可以将数据写入文件或写入标准输出(例如命令行终端)。Flink程序在各种上下文中运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中执行,也可以在许多机器的集群中执行。
下面的程序是一个完整的流窗口单词计数应用程序的工作示例,它在5秒的窗口中对来自web套接字的单词进行计数。您可以复制并粘贴代码在本地运行它。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
要运行示例程序,首先从终端启动netcat的输入流:
只需键入一些单词,然后按下回车键就可以得到一个新词。这些将是单词计数程序的输入。如果你想看到数大于1,输入相同的单词一遍又一遍地在5秒(如果你不能快速敲键盘,增加窗口大小的5秒内?)
Table API & SQL
Apache Flink具有两个用于统一流和批处理的关系API——Table API和SQL。Table API是Scala和Java的语言集成查询API,允许从关系操作符(如选择、筛选和以非常直观的方式连接)中组合查询。Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批输入(数据集)还是流输入(DataStream),任何接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL接口以及Flink的DataStream和DataSet API紧密集成在一起。您可以很容易地在所有api和基于这些api的库之间切换。例如,您可以使用CEP库从数据流中提取模式,然后使用表API分析模式,或者在对预处理数据运行Gelly图形算法之前,您可以使用SQL查询扫描、过滤和聚合批处理表。
请注意,Table API和SQL的特性还不完整,正在积极开发中。不是所有的操作都被[Table API, SQL]和[stream, batch]输入的每个组合所支持。
SQL标准的Apache Calcite
statement:
setStatement
| resetStatement
| explain
| describe
| insert
| update
| merge
| delete
| query
setStatement:
[ ALTER ( SYSTEM | SESSION ) ] SET identifier ‘=‘ expression
resetStatement:
[ ALTER ( SYSTEM | SESSION ) ] RESET identifier
| [ ALTER ( SYSTEM | SESSION ) ] RESET ALL
explain:
EXPLAIN PLAN
[ WITH TYPE | WITH IMPLEMENTATION | WITHOUT IMPLEMENTATION ]
[ EXCLUDING ATTRIBUTES | INCLUDING [ ALL ] ATTRIBUTES ]
[ AS JSON | AS XML ]
FOR ( query | insert | update | merge | delete )
describe:
DESCRIBE DATABASE databaseName
| DESCRIBE CATALOG [ databaseName . ] catalogName
| DESCRIBE SCHEMA [ [ databaseName . ] catalogName ] . schemaName
| DESCRIBE [ TABLE ] [ [ [ databaseName . ] catalogName . ] schemaName . ] tableName [ columnName ]
| DESCRIBE [ STATEMENT ] ( query | insert | update | merge | delete )
insert:
( INSERT | UPSERT ) INTO tablePrimary
[ ‘(‘ column [, column ]* ‘)‘ ]
query
update:
UPDATE tablePrimary
SET assign [, assign ]*
[ WHERE booleanExpression ]
assign:
identifier ‘=‘ expression
merge:
MERGE INTO tablePrimary [ [ AS ] alias ]
USING tablePrimary
ON booleanExpression
[ WHEN MATCHED THEN UPDATE SET assign [, assign ]* ]
[ WHEN NOT MATCHED THEN INSERT VALUES ‘(‘ value [ , value ]* ‘)‘ ]
delete:
DELETE FROM tablePrimary [ [ AS ] alias ]
[ WHERE booleanExpression ]
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL | DISTINCT ] query
| query EXCEPT [ ALL | DISTINCT ] query
| query MINUS [ ALL | DISTINCT ] query
| query INTERSECT [ ALL | DISTINCT ] query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT [ start, ] { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY ]
withItem:
name
[ ‘(‘ column [, column ]* ‘)‘ ]
AS ‘(‘ query ‘)‘
orderItem:
expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]
select:
SELECT [ STREAM ] [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ ( LEFT | RIGHT | FULL ) [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING ‘(‘ column [, column ]* ‘)‘
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ ‘(‘ columnAlias [, columnAlias ]* ‘)‘ ] ]
tablePrimary:
[ [ catalogName . ] schemaName . ] tableName
‘(‘ TABLE [ [ catalogName . ] schemaName . ] tableName ‘)‘
| tablePrimary [ EXTEND ] ‘(‘ columnDecl [, columnDecl ]* ‘)‘
| [ LATERAL ] ‘(‘ query ‘)‘
| UNNEST ‘(‘ expression ‘)‘ [ WITH ORDINALITY ]
| [ LATERAL ] TABLE ‘(‘ [ SPECIFIC ] functionName ‘(‘ expression [, expression ]* ‘)‘ ‘)‘
columnDecl:
column type [ NOT NULL ]
values:
VALUES expression [, expression ]*
groupItem:
expression
| ‘(‘ ‘)‘
| ‘(‘ expression [, expression ]* ‘)‘
| CUBE ‘(‘ expression [, expression ]* ‘)‘
| ROLLUP ‘(‘ expression [, expression ]* ‘)‘
| GROUPING SETS ‘(‘ groupItem [, groupItem ]* ‘)‘
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
‘(‘
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING }
| ROWS numericExpression { PRECEDING | FOLLOWING }
]
‘)‘
在insert中,如果insert或UPSERT语句没有指定目标列的列表,查询的列数必须与目标表相同,除非是在某些一致性级别。
在merge中,至少有一个匹配时和未匹配时的子句必须出现。
tablePrimary可能只包含特定符合性级别的扩展子句;在这些相同的一致性级别中,insert中的任何列都可以被columnDecl替换,其效果类似于将其包含在EXTEND子句中。
在orderItem中,如果表达式是正整数n,它表示SELECT子句中的第n项。
在查询中,count和start可以是无符号整型字面值,也可以是值为整型的动态参数。
aggregate聚合查询是包含GROUP BY或HAVING子句或SELECT子句中的聚合函数的查询。在SELECT中,具有和ORDER BY子句的聚合查询中,所有表达式都必须是当前组中的常量(即,按照group BY子句或常量的定义对常量进行分组)、聚合函数或常量与聚合函数的组合。聚合和分组函数只能出现在聚合查询中,而且只能出现在SELECT、HAVING或ORDER BY子句中。
标量子查询是用作表达式的子查询。如果子查询不返回行,则该值为空;如果它返回多个行,则为错误。
IN、EXISTS和scalar子查询可以出现在表达式的任何地方(例如SELECT子句、where子句、ON子句连接或聚合函数的参数)。
一个IN、EXISTS或scalar子查询可能相互关联;也就是说,它可以引用包含查询的FROM子句中的表。
selectWithoutFrom等价于值,但不是标准SQL,只允许在某些符合级别中使用。
MINUS相当于EXCEPT,但不是标准SQL,只允许在某些一致性级别上使用。
交叉应用和外部应用只允许在某些符合级别。
“限制开始,计数”相当于“限制计数偏移开始”,但只允许在某些符合级别。“LIMIT start, count” is equivalent to “LIMIT count OFFSET start” but is only allowed in certain conformance levels.
以上是关于flink流计算随笔的主要内容,如果未能解决你的问题,请参考以下文章