如何在火花流中添加 2 行具有相同键(列值)的行?

Posted

技术标签:

【中文标题】如何在火花流中添加 2 行具有相同键(列值)的行?【英文标题】:How do I add 2 rows with same keys(column values) in spark streaming? 【发布时间】:2020-08-30 11:44:40 【问题描述】:

我的 hdfs 有这些数据流:

nyu,-0.0,1.36,0.64,1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.494,1.506,0.0,-1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
cynthia,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,-0.0,1.322,0.6779999999999999,1.8496000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
haha,-0.0,0.921,1.079,1.5928,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.16499999999999998,1.419,0.417,1.6442999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.076,1.608,0.317,1.334,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.142,1.363,0.497,1.8187,15131c3f-7ad9-489d-bbc4-0f99305b7db0
american,-0.028,1.888,0.084,0.8658,15131c3f-7ad9-489d-bbc4-0f99305b7db0
middleburi,-0.148,1.6880000000000002,0.164,0.5698000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
metro,-0.096,1.654,0.249,1.3209,15131c3f-7ad9-489d-bbc4-0f99305b7db0
simon,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
korea,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anthoni,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anderson,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.7349999999999999,0.217,1.6118999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.20700000000000002,1.6949999999999998,0.097,0.3662000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
america,-0.047,1.338,0.614,1.679,15131c3f-7ad9-489d-bbc4-0f99305b7db0

我想将具有相同单词(第一列)和文档编号(最后一列)的分数相加。

到目前为止,我有以下代码:

from pyspark.streaming import StreamingContext
import time
import pprint
from pyspark.sql import functions as F
ssc = StreamingContext(sc, 60)
lines = ssc.textFileStream("hdfs://thesis:9000/user/kush/data/")
data = lines.map(lambda x: x.split(','))
// trying to do the task here
m_Data = data.reduceByKey(lambda x,y: (x[1] + y[1], x[2] + y[2],x[3]+y[3],x[4]+y[4]))
m_Data.pprint()
ssc.start()
time.sleep(5)

这在 pyspark 流中怎么可能实现?

【问题讨论】:

您可以简单地按列分组并按分数汇总。如果这不是您的意思,我建议您为示例数据添加预期输出。 我不认为我可以通过 groupby 进行直播。我在 foreachRDD 中这样做吗? 流媒体本质上是微批处理(据我所知)。请将您的代码添加到问题中,然后尝试在此基础上构建。 我已经更新了问题。感谢您提前查看。 【参考方案1】:

要使用reduce by key,你实际上需要帮助spark确定key。我创建了一个名为 pair 的键/值 rdd。

关键字由单词和文档编号决定。 值是对应于分数的数据结构。分数也被转换为浮点数(或根据您的数据集您想要的任何其他值)以进行计算。

data = lines.map(lambda x: x.split(','))
pair = data.map(lambda x: ( (x[0],x[5]), (float(x[1]),float(x[2]),float(x[3]),float(x[4])) ))
aggregation = pair.reduceByKey(lambda x,y: ( x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3] ))
aggregation.pprint(20)

样本输出:

(('haha', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.0, 0.921, 1.079, 1.5928))
(('american', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.028, 1.888, 0.084, 0.8658))
(('madrid', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.617, 4.968999999999999, 0.41400000000000003, 0.8935000000000002))
(('middleburi', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.148, 1.6880000000000002, 0.164, 0.5698000000000001))
(('cynthia', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.085, 1.4300000000000002, 0.485, 1.6916))
(('metro', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.096, 1.654, 0.249, 1.3209))
(('korea', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.797, 0.155, 1.2171))
(('anthoni', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.855, 0.097, 0.9211))
(('anderson', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.855, 0.097, 0.9211))
(('spain', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.6549999999999999, 9.806, 1.538, 7.8753))
(('nyu', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.0, 1.36, 0.64, 1.3616))
(('rachel', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.085, 2.7520000000000002, 1.1629999999999998, 3.5412))
(('simon', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.797, 0.155, 1.2171))
(('america', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.338, 0.614, 1.679))

在单独的说明中,键 k(单词/文档)在时间 t 出现,然后 30 分钟后出现第二个条目的情况将不包含在此范围内.

【讨论】:

以上是关于如何在火花流中添加 2 行具有相同键(列值)的行?的主要内容,如果未能解决你的问题,请参考以下文章

如何从单个表中选择多个列值相同的行

如何在MySql中返回具有相同列值的行

PostgreSQL,删除具有重新编号列值的行

连接具有相同值的行的列值(不同列的)

如何在火花中使用逗号分隔符将相同的列值连接到新列

Jquery:在包含特定列值的行之后插入新行