如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

Posted

技术标签:

【中文标题】如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?【英文标题】:How can I use 'for' loop to do Transformation and Output in Spark-Streaming's DStream? 【发布时间】:2016-11-20 16:56:49 【问题描述】:

我是 Spark 的新手,我使用我定义的类生成 1000 个不同的实例(这些实例中的函数相同,但详细函数的参数不同)。sampler=generateClass() 然后我需要映射这些实例的函数到我的 Stream。(测试,只需使用 10 和 2 个实例)

s=[]
for i in range(10):        
    s.append(mappedStream.map(lambda x: sampler[i].insert(x)).reduce(min))

uStream=ssc.union(s[0],s[1],s[2],s[3],s[4],s[5],s[6],s[7],s[8],s[9])
uStream.pprint()

但它的输出只是 10 个相同的键值对,似乎这些代码只是将我的数据映射到第一个实例,然后重复 10 次。

(85829323L, [2, 1])
(85829323L, [2, 1])
(85829323L, [2, 1])
(85829323L, [2, 1])
....

那我试试

myStream1=mappedStream.map(lambda x: sampler[0].insert(x)).reduce(min)
myStream2=mappedStream.map(lambda x: sampler[1].insert(x)).reduce(min)
ssc.union(myStream1,myStream2).pprint()

输出是正确的:

(85829323L, [2, 1])
(99580454L, [4, 1])

为什么会这样?我该如何处理?非常感谢你。

【问题讨论】:

【参考方案1】:

发生这种情况是因为 python lambda 是惰性求值的,并且当您对 s[0] 调用操作时,会使用最后一个 i 参数来计算(在您的情况下,9 是最后一个循环值)。

您可以使用函数生成器模式来“强制”使用适当的i,例如:

def call_sampler(i):
    return lambda x: sampler[i].insert(x)

s=[]
for i in range(10):        
    s.append(mappedStream.map(call_sampler(i)).reduce(min))

uStream=ssc.union(s[0],s[1],s[2],s[3],s[4],s[5],s[6],s[7],s[8],s[9])
uStream.pprint()

【讨论】:

以上是关于如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?的主要内容,如果未能解决你的问题,请参考以下文章

java spark-streaming接收TCP/Kafka数据

Spark-Streaming 记录比较

spark-streaming scala:如何将字符串数组传递给过滤器?

spark-streaming对接kafka的两种方式

流式计算助力实时数据处理spark-streaming入门实战

spark-streaming first insight