如何找到每个 DStream 的 RDD 中所有值的总和?

Posted

技术标签:

【中文标题】如何找到每个 DStream 的 RDD 中所有值的总和?【英文标题】:How to find the sum of all values in RDDs per DStream? 【发布时间】:2016-03-24 17:55:11 【问题描述】:

我正在使用 Spark Streaming 从 kafka 连续读取数据并执行一些统计信息。我每秒都在流式传输。

所以我有 一秒钟的批次(dstreams)。此 dstream 中的每个 RDD 都包含一个 JSON。

这就是我的 dstream:

kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], "metadata.broker.list": 'localhost:9092')
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))

我的 clean dstream 中的一个 RDD 如下所示:

u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\
u'source_port': 43375, u'destination_port': 443

我在每个 DStream 中有大约 30-150 个这样的 RDD。

现在,我要做的是,获取每个 DStream 中“长度”的总和或说“数据包计数”。也就是说,

rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length

我尝试了什么:

add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)

我得到了什么:

频率而不是总和。

(17, 6)
(6, 24)

我应该怎么做才能得到总和而不是键的频率?

【问题讨论】:

【参考方案1】:

那是因为你使用 'length' 的值作为键,试试这个:

add=clean.map(lambda xs: ('Lenght',xs['length'])).reduceByKey(lambda a, b: a+b)

您必须为所有对(键,值)设置相同的键。该值可以是字段长度或其他要聚合的字段...

【讨论】:

工作,谢谢!只是一个额外的问题,我想将另外 2 个参数从 clean 添加到 add 中,比如 ('partitionkey', 'timestamp') 这个以及刚刚计算的 'length' 参数。我该怎么做?

以上是关于如何找到每个 DStream 的 RDD 中所有值的总和?的主要内容,如果未能解决你的问题,请参考以下文章

SparkStreaming DStream转换

深入理解Spark Streaming

如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)

如何从持续的 RDD 构造 DStream?

如何保存 Spark Java Dstream RDD

Spark DStream 转换