Kafka streams概览

Posted 三三札记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka streams概览相关的知识,希望对你有一定的参考价值。

先回答题图的3个问题:

  • 什么是流处理(stream processing)? 

    • 与批处理对应的一个名词

    •  数据源是持续不断产生数据的,而不是定期产生数据

    •  对持续不断产生的数据持续处理即为流处理

  • Why kafka streams?

    • storm,spark等常用流处理工具倾向于基于kafka队列实现数据中转

    • kafka streams与kafka队列集成度最高,新特性最新被集成,比如不丢不重的特性

    • kafka streams作为商业公司confluent的主要卖点,有持续产品完善的预期及商业支持的可能

    • 相对其他流处理框架易于运用

  • kafka streams 有什么特点? 

    • streams是kafka队列的一个客户端应用,无runtime支持,多个streams需要应用自己管理

    • streams应用随队列的partition策略而rebalance

    • 提供相对易用的DSL和更灵活的Processor两套API,Confluent平台还提供KSQL构建streams



一些概念:

  • Topology:定义stream中的各个组件及协同关系,分为不同的node,包括:

    source, 从kafka topics中获取数据并传给porocessor

    processor, 从上一个node中接收数据并处理数据,可以继续传给下一个processor,也可以传给sink

    sink,从processor中获取数据并写入topics,

构建好了Topology之后,传给KafkaStreams,启动后就能按照这个Topology运行了。

  • StreamsBuilder,high-level DSL strem构建器,stream()返回KStream<k,v>,table()返回KTable<k,v>,

    build() 返回一个Topology,用于传递给KafkaStreams, 可以通过Consumed参数指定一个自定义的参数,比如Serde等。Produced用于一个stream.to()或者throught()到另外一个stream或者table时指定参数。

  • KStream, 接口,一个key对应多个带有历史记录value的kv结构, 比如K人买了I1,I2两个东西,就是<K,I1>,<K,I2>两条记录进入stream中。

   A KStream can be transformed record by record, joined with another KStream, KTable, GlobalKTable, or can be aggregated into a KTable

    • flatMapValues(ValuMapper(v, vr)) , 将value从v转换为vr,vr是一个Iterable接口的类, 底层实现是KStreamImpl ,调用Processor API进行处理。

    • count(), groupBy()之后可以count,支持 Materialized.as() 指定一个store,用于存储count返回的值 。

  • KTable,一个key只有一个最新状态的value的kv结构

  • KafkaStreams, kafka streams处理的客户端,内部有 KafkaProducer and KafkaConsumer ,通过KafkaStreams.start()启动一个流处理,可以多线程,通过shut down handler处理ctrl+c,调用close(),通过CountDownLatch控制。KafkaStreams构建时需要用到Topology和Properties

  • Processor API(PAPI):process(),transform()等,streams处理的底层接口,自定义程度更高,所写的代码更多。

  • 时间概念:分为

    • event time:数据源产生数据的时间

    • processing time:数据被流处理程序处理的时间

    • ingestion time:数据存往队列的时间

                用哪个时间通过kafka broker的配置来指定,可以分不同topic设置,

  • 状态:中间状态通过store保存,通过interactive query查询。stateful operators操作之后一般需要保存状态,比如join(),aggregate()等

  • 一致性保障:通过processing.guarantee配置,默认是at_least_once ,可以改为exactly_once

  • 容错机制:partition本身有高可用和多份备份,state store也可以配置多备份,同时有topic changelog,如果没有多备份的statestore,将需要从changelog中恢复state store,耗时较长,可以考虑在streams中配置num.standby.replicas

  • StreamsConfig,通过properties配置。

    • application.id,同一个应用所有实例共用一个id,id名可以标上版本号,升级版本时如果不复用之前的数据,可以用不同的版本号以示区分。application.id默认会作为consumer和producer的client.id的前缀,同时作为consumer的group.id,作为state.dir的名字,kafka内部的topic名字前缀

  • stateless DSL:

    • branch() ,将一个stream按不同条件拆分多个子stream, 按条件顺序进行匹配,匹配上了就进入,条件可以不互斥

    • filter(),根据条件过滤

    • filterNot(),根据条件删除一些值

    • flatMap(),从一个key,value产生0到多个k,v对

    • flatMapValues(),从一个k,v产生多个k,v对, key保持不变

    • forEach(),Terminal operation,不返回stream,对stream每个元素做print等操作。

    • groupByKey(),按key分组,返回KGroupedStream,key不改变

    • groupBy(),指定分组条件,不按key分组用这个操作

    • map(),一对一的映射操作,k,v都可以改变

    • mapValues(),仅改变value

    • peek(),功能与forEach()相同,在stream中使用的话,处理结果没法通过kafka保存

  • stateful DSL:

    • aggregate(),

      • 需要指定初始化累加值() ->0L

      • 累加的方法: (key,value,aggregate) -> aggregate+value

      • 如果涉及类型转换,通过Materialize.as().withValueSerde()指定相应的类型

    • windowedBy(),指定时间窗口,key变成了Windowed<>,stateStore使用WindowStore<>

    • reduce(),组合(combine) groupbykey之后的values,和aggregate不同的是,结果的类型不能改变,所以如果value是Long型,stream需要通过mapValues()转为Long型,然后进行reduce运算。reduce不需要设置初始化值。

    • join

      • 使用KTable做join要求被join的两个对象满足co-partition,即有相同的partition数量,分区策略也要求相同,即相同的key必须在同一个partition编号中。如果不满足co-partition的要求,则可以对分区较少的一边重新写到一个新的stream或者table中去。

      • 使用GlobalKTable对co-partition没有要求

      • stream之间的join需要指定time window

      • join的逻辑是基于key相等,然后对两个value进行操作。

  • Record cache

    • 通过cache.max.bytes.buffering配置

    • 如果不打开cache,每一次stateful的操作都会体现在KTable中,如果打开了cache,则中间的stateful操作结果可能不会被记录,默认的cache是打开的。

  • windowing

    • tumbling window,滚动窗口,窗口大小与前进的窗口大小正好相同的hopping window,窗口不重叠。通过TimeWindows.of()构造

    • hopping window,窗口大小固定,时间步进大小与窗口大小可以不一致。通过TimeWindows.of(windowSizeMs).advanceBy(advanceMs) 构造

    • sliding window,用于join()操作,通过JoinWindows.of()构造。与数据的timdestatmp有关系。

    • session window,通过SessionWindows.with(mills)构造,mills指定的是空闲时间,超过空闲时间的数据会放到一个新的session window中,在session活动期间内的数据都放在同一个session中,所以session window的窗口大小不是固定的。

  • interactive query

    • 基于state store的查询接口,默认的state store是rocksdb

    • apache 上面的文档不完整,example代码得到confluentinc的github去找。


以上是关于Kafka streams概览的主要内容,如果未能解决你的问题,请参考以下文章

Streaming30分钟概览Spark Streaming 实时计算

Flink Kafka Stream 相对于 Spark Kafka Stream 的优势? Flink 上的 Kafka Stream 呢? [关闭]

Akka Stream Kafka vs Kafka Streams

Spark 系列(十六)—— Spark Streaming 整合 Kafka

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一