如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)

Posted

技术标签:

【中文标题】如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)【英文标题】:How to Combine two Dstreams using Pyspark (similar to .zip on normal RDD) 【发布时间】:2016-05-26 16:27:12 【问题描述】:

我知道我们可以在 pyspark 中组合(如 R 中的 cbind)两个 RDD,如下所示:

rdd3 = rdd1.zip(rdd2)

我想对 pyspark 中的两个 Dstream 执行相同的操作。是否有可能或任何替代方案?

事实上,我正在使用 MLlib 随机森林模型来预测使用火花流。 最后,我想将特征 Dstream 和预测 Dstream 结合在一起进行进一步的下游处理。

提前致谢。

-奥贝德

【问题讨论】:

【参考方案1】:

最后,我在下面使用。

诀窍是使用“native python map”和“spark spreaming transform”。 可能不是一种优雅的方式,但它有效:)。

def predictScore(texts, modelRF):
    predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
     map(lambda (txt, features) : (txt ,(features.split(','))) ).\
     map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
     transform( lambda  rdd: sc.parallelize(\
       map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
       )\
     )
    # in the transform operation: x=text and y=features
    # Return will be tuple of (score,'original text')
    return predictions

希望,它会帮助面临同样问题的人。 如果有人有更好的想法,请在此处发布。

-奥贝德

注意:我还在 spark 用户列表上提交了问题,并在那里发布了我的答案。

【讨论】:

你也可以在scala中编写解决方案吗?

以上是关于如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark流计算csv文件中的条目数

如何使用 PySpark 对两个 RDD 进行完全外连接?

如何将 Spark Streaming DStream 制作为 SQL 表

如何从 pyspark rdd 或分区中确定原始 s3 输入文件名

在 pyspark 中合并两个 RDD

如何使用pyspark将两列值组合到另一列?