Kafka streams概览
Posted 三三札记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka streams概览相关的知识,希望对你有一定的参考价值。
先回答题图的3个问题:
什么是流处理(stream processing)?
与批处理对应的一个名词
数据源是持续不断产生数据的,而不是定期产生数据
对持续不断产生的数据持续处理即为流处理
Why kafka streams?
storm,spark等常用流处理工具倾向于基于kafka队列实现数据中转
kafka streams与kafka队列集成度最高,新特性最新被集成,比如不丢不重的特性
kafka streams作为商业公司confluent的主要卖点,有持续产品完善的预期及商业支持的可能
相对其他流处理框架易于运用
kafka streams 有什么特点?
streams是kafka队列的一个客户端应用,无runtime支持,多个streams需要应用自己管理
streams应用随队列的partition策略而rebalance
提供相对易用的DSL和更灵活的Processor两套API,Confluent平台还提供KSQL构建streams
一些概念:
Topology:定义stream中的各个组件及协同关系,分为不同的node,包括:
source, 从kafka topics中获取数据并传给porocessor
processor, 从上一个node中接收数据并处理数据,可以继续传给下一个processor,也可以传给sink
sink,从processor中获取数据并写入topics,
构建好了Topology之后,传给KafkaStreams,启动后就能按照这个Topology运行了。
StreamsBuilder,high-level DSL strem构建器,stream()返回KStream<k,v>,table()返回KTable<k,v>,
build() 返回一个Topology,用于传递给KafkaStreams, 可以通过Consumed参数指定一个自定义的参数,比如Serde等。Produced用于一个stream.to()或者throught()到另外一个stream或者table时指定参数。
KStream, 接口,一个key对应多个带有历史记录value的kv结构, 比如K人买了I1,I2两个东西,就是<K,I1>,<K,I2>两条记录进入stream中。
A KStream can be transformed record by record, joined with another KStream, KTable, GlobalKTable, or can be aggregated into a KTable
flatMapValues(ValuMapper(v, vr)) , 将value从v转换为vr,vr是一个Iterable接口的类, 底层实现是KStreamImpl ,调用Processor API进行处理。
count(), groupBy()之后可以count,支持 Materialized.as() 指定一个store,用于存储count返回的值 。
KTable,一个key只有一个最新状态的value的kv结构
KafkaStreams, kafka streams处理的客户端,内部有 KafkaProducer and KafkaConsumer ,通过KafkaStreams.start()启动一个流处理,可以多线程,通过shut down handler处理ctrl+c,调用close(),通过CountDownLatch控制。KafkaStreams构建时需要用到Topology和Properties
Processor API(PAPI):process(),transform()等,streams处理的底层接口,自定义程度更高,所写的代码更多。
时间概念:分为
event time:数据源产生数据的时间
processing time:数据被流处理程序处理的时间
ingestion time:数据存往队列的时间
用哪个时间通过kafka broker的配置来指定,可以分不同topic设置,
状态:中间状态通过store保存,通过interactive query查询。stateful operators操作之后一般需要保存状态,比如join(),aggregate()等
一致性保障:通过processing.guarantee配置,默认是at_least_once ,可以改为exactly_once
容错机制:partition本身有高可用和多份备份,state store也可以配置多备份,同时有topic changelog,如果没有多备份的statestore,将需要从changelog中恢复state store,耗时较长,可以考虑在streams中配置num.standby.replicas
StreamsConfig,通过properties配置。
application.id,同一个应用所有实例共用一个id,id名可以标上版本号,升级版本时如果不复用之前的数据,可以用不同的版本号以示区分。application.id默认会作为consumer和producer的client.id的前缀,同时作为consumer的group.id,作为state.dir的名字,kafka内部的topic名字前缀
stateless DSL:
branch() ,将一个stream按不同条件拆分多个子stream, 按条件顺序进行匹配,匹配上了就进入,条件可以不互斥
filter(),根据条件过滤
filterNot(),根据条件删除一些值
flatMap(),从一个key,value产生0到多个k,v对
flatMapValues(),从一个k,v产生多个k,v对, key保持不变
forEach(),Terminal operation,不返回stream,对stream每个元素做print等操作。
groupByKey(),按key分组,返回KGroupedStream,key不改变
groupBy(),指定分组条件,不按key分组用这个操作
map(),一对一的映射操作,k,v都可以改变
mapValues(),仅改变value
peek(),功能与forEach()相同,在stream中使用的话,处理结果没法通过kafka保存
stateful DSL:
aggregate(),
需要指定初始化累加值() ->0L
累加的方法: (key,value,aggregate) -> aggregate+value
如果涉及类型转换,通过Materialize.as().withValueSerde()指定相应的类型
windowedBy(),指定时间窗口,key变成了Windowed<>,stateStore使用WindowStore<>
reduce(),组合(combine) groupbykey之后的values,和aggregate不同的是,结果的类型不能改变,所以如果value是Long型,stream需要通过mapValues()转为Long型,然后进行reduce运算。reduce不需要设置初始化值。
join
使用KTable做join要求被join的两个对象满足co-partition,即有相同的partition数量,分区策略也要求相同,即相同的key必须在同一个partition编号中。如果不满足co-partition的要求,则可以对分区较少的一边重新写到一个新的stream或者table中去。
使用GlobalKTable对co-partition没有要求
stream之间的join需要指定time window
join的逻辑是基于key相等,然后对两个value进行操作。
Record cache
通过cache.max.bytes.buffering配置
如果不打开cache,每一次stateful的操作都会体现在KTable中,如果打开了cache,则中间的stateful操作结果可能不会被记录,默认的cache是打开的。
windowing
tumbling window,滚动窗口,窗口大小与前进的窗口大小正好相同的hopping window,窗口不重叠。通过TimeWindows.of()构造
hopping window,窗口大小固定,时间步进大小与窗口大小可以不一致。通过TimeWindows.of(windowSizeMs).advanceBy(advanceMs) 构造
sliding window,用于join()操作,通过JoinWindows.of()构造。与数据的timdestatmp有关系。
session window,通过SessionWindows.with(mills)构造,mills指定的是空闲时间,超过空闲时间的数据会放到一个新的session window中,在session活动期间内的数据都放在同一个session中,所以session window的窗口大小不是固定的。
interactive query
基于state store的查询接口,默认的state store是rocksdb
apache 上面的文档不完整,example代码得到confluentinc的github去找。
以上是关于Kafka streams概览的主要内容,如果未能解决你的问题,请参考以下文章
Streaming30分钟概览Spark Streaming 实时计算
Flink Kafka Stream 相对于 Spark Kafka Stream 的优势? Flink 上的 Kafka Stream 呢? [关闭]
Akka Stream Kafka vs Kafka Streams
Spark 系列(十六)—— Spark Streaming 整合 Kafka