reduceByKey PySpark 中的列表列表
Posted
技术标签:
【中文标题】reduceByKey PySpark 中的列表列表【英文标题】:reduceByKey a list of lists in PySpark 【发布时间】:2019-01-21 00:23:14 【问题描述】:我是 pyspark 的新手,到目前为止,当您习惯使用 pandas 等库时,很难理解它的工作方式。但这似乎是大数据的必经之路。
对于我目前的 ETL 工作,我有以下要素:
这是我的rdd:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
],
[
('SMSG', 'BKT'), ('SQNR', '00000024'), ('STNQ', '06'), ('TRNN', '000002'), ('NRID', ' '), ('TREC', '020'), ('TRNN', '000002'), ('NRID', ' '), ('TACN', '001'), ('CARF', ' '), ...
],
...
]
行数据是一个固定大小的文本文件。
我现在要做的是对列表的每个单元格进行分组。
最终结果应该是:
[
[
('SMSG_1', 'BKT'),('SMSG_2','BKS'),('SQNR_1', '00000004'),('SQNR_2', '00000005'),('STNQ_1','06'),('STNQ_2','24'),('TRNN', '000001'),()('DAIS', '171231'),...
],
[
('SMSG', 'BKT'),('SQNR', '00000024'),('STNQ','06'),('TRNN', '000002'),('NRID', ' '), ('TREC', '020'), ('TACN', '001'), ('CARF', ' '),...
],
...
]
基本上规则如下:
1- 如果键相同且值也相同,则删除重复项。
2- 如果键相同而值不同,重命名列并添加后缀为“_Number”,其中 Number 可以替换为该键的迭代次数。
我的代码开始如下:
def addBKT():
...
def prepareTrans():
...
if __name__ == '__main__':
input_folder = '/Users/admin/Documents/Training/FR20180101HOT'
rdd = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
rdd = rdd.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
print(rdd.take(1))
打印件给了我(如前所述)以下元组列表。我只取了 1 个子列表,但完整的 rdd 有大约 2000 个元组子列表:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
]
]
我首先尝试将嵌套列表减少如下:
rdd = rdd.flatMap(lambda x:x).reduceByKey(list)
我期待结果是一个没有重复的新列表列表,对于具有不同值的元组,将它们全部分组在同一个键下。但是,我无法做到这一点。
作为第二步,我计划将具有多个值的元组转换为新的元组对,就像我在分组元组中获得的值一样:即 ('Key', ['Value1', 'Value2']) 变为 ( 'Key_1', 'Value1'),('Key_2', 'Value2')
最后,所有这些转换的输出是将最终的 RDD 转换为 DataFrame 并以 parquet 格式存储。
我真的希望过去有人做过类似的事情。我花了很多时间尝试这样做,但我无法做到,也无法在网上找到任何示例。
感谢您的帮助。
【问题讨论】:
【参考方案1】:由于您是 Spark 新手,您可能不知道 Spark Dataframe。与 RDD 相比,Dataframe 是一个先进的概念。在这里,我使用 Pyspark Dataframe 解决了您的问题。看看这个,不要犹豫,学习 spark Dataframe。
rdd1 = sc.parallelize([("SMSG", "BKT"), ("SMSG", "BKT"), ("SMSG", "BKS"), ('SQNR', '00000004'), ('SQNR', '00000005') ])
rddToDF = rdd1.toDF(["C1", "C2"])
+----+--------+
| C1| C2|
+----+--------+
|SMSG| BKT|
|SMSG| BKT|
|SMSG| BKS|
|SQNR|00000004|
|SQNR|00000005|
+----+--------+
DfRmDup = rddToDF.drop_duplicates() #Removing duplicates from Dataframe
DfRmDup.show()
+----+--------+
| C1| C2|
+----+--------+
|SQNR|00000004|
|SMSG| BKT|
|SQNR|00000005|
|SMSG| BKS|
+----+--------+
rank = DfRmDup.withColumn("rank", dense_rank().over(Window.partitionBy("C1").orderBy(asc("C2"))))
rank.show()
+----+--------+----+
| C1| C2|rank|
+----+--------+----+
|SQNR|00000004| 1|
|SQNR|00000005| 2|
|SMSG| BKS| 1|
|SMSG| BKT| 2|
+----+--------+----+
rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").show()
+------+--------+
| C1| C2|
+------+--------+
|SQNR_1|00000004|
|SQNR_2|00000005|
|SMSG_1| BKS|
|SMSG_2| BKT|
+------+--------+
#Converting back to RDD
rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").rdd.map(lambda x: (x[0],x[1])).collect()
[('SQNR_1', '00000004'),
('SQNR_2', '00000005'),
('SMSG_1', 'BKS'),
('SMSG_2', 'BKT')]
【讨论】:
嗨,Pradeep,非常感谢,它几乎解决了我的问题,我唯一缺少的部分是我有一个列表列表,每个列表都是我最终 DataFrame 中的一行。即:[[("SMSG", "BKT"), ('SQNR', '00000004')],[("SMSG", "BKS"), ('SQNR', '00000006')]] 应该是结果:+--------+--------+ |短信| SQNR| +--------+--------+ |BKT |00000004| |BKS |00000005| +--------+--------+ 我在创建 DF 时遇到的另一个问题是我的一些值不是 str。这是一个问题吗?因为我过去有一些错误,我认为 pyspark 期待一个模式来创建 DF。我应该给 toDF 一个包含每列类型的模式吗?如果是的话,我事先并不知道所有的专栏。每个列表可能有不同的列。在 pandas 中,我可以动态添加列。 @MehdiMansouri,试试他们在 ans 部分中提到的这种方式。 ***.com/questions/47259621/… 嗨,Pradeep,我尝试了链接中提供的解决方案。数据框创建成功,谢谢。【参考方案2】:非常感谢您提供的链接,我遵循了提供的解决方案。数据框已成功创建,这很棒。
input_folder = '/Users/admin/Documents/Training/FR20180101HOT'
rdd_split = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
rdd_trans = rdd_split.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
#rdd_group = rdd_trans.map(lambda x : x[i] for i in range(len(x))).reduceByKey(lambda x, y: str(x) + ','+ str(y))
df = spark.read.options(inferSchema="true").csv(rdd_trans)
print(df.show(1))
印刷品向我展示了类似的东西:
+--------+-------+--------+------------+--------+------+--------+----------+----...
| _c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11| _c12| _c13| _c14| _c15| _c16| _c17| _c18| _c19| _c20| _c21| _c22| _c23| _c24| _c25| _c26| _c27| _c28| _c29| _c30| _c31| _c32| _c33| _c34| _c35| _c36| _c37| _c38| _c39| _c40| _c41| _c42| _c43| _c44| _c45| _c46| _c47| _c48| _c49| _c50| _c51| _c52| _c53| _c54| _c55| _c56| _c57| _c58| _c59| _c60| _c61| _c62| _c63| _c64| _c65| _c66| _c67| _c68| _c69| _c70| _c71| _c72| _c73| _c74| _c75| _c76| _c77| _c78| _c79| _c80| _c81| _c82| _c83| _c84| _c85| _c86| _c87| _c88| _c89| _c90| _c91| _c92| _c93| _c94| _c95| _c96| _c97| _c98| _c99| _c100| _c101| _c102| _c103| _c104| _c105| _c106| _c107| _c108| _c109| _c110| _c111| _c112| _c113| _c114|_c115| _c116|_c117| _c118|_c119| _c120| _c121| _c122| _c123| _c124| _c125| _c126| _c127| _c128| _c129| _c130| _c131| _c132|_c133| _c134| _c135| _c136| _c137| _c138| _c139| _c140| _c141| _c142| _c143| _c144| _c145| _c146| _c147| _c148| _c149| _c150|_c151| _c152|_c153| _c154|_c155| _c156| _c157| _c158| _c159| _c160| _c161| _c162|_c163| _c164| _c165| _c166| _c167| _c168|_c169| _c170| _c171| _c172| _c173| _c174| _c175| _c176| _c177| _c178| _c179| _c180| _c181| _c182| _c183| _c184| _c185| _c186|_c187| _c188|_c189| _c190|_c191| _c192| _c193| _c194| _c195| _c196| _c197| _c198| _c199| _c200| _c201| _c202| _c203| _c204|_c205| _c206| _c207| _c208| _c209| _c210| _c211| _c212| _c213| _c214| _c215| _c216| _c217| _c218| _c219| _c220| _c221| _c222|_c223| _c224|_c225| _c226|_c227| _c228| _c229| _c230| _c231| _c232| _c233| _c234| _c235| _c236| _c237| _c238| _c239| _c240|_c241| _c242| _c243| _c244| _c245| _c246| _c247| _c248| _c249| _c250| _c251| _c252| _c253| _c254| _c255| _c256| _c257| _c258|_c259| _c260|_c261| _c262|_c263| _c264| _c265| _c266| _c267| _c268| _c269| _c270|_c271| _c272| _c273| _c274|_c275| _c276|_c277| _c278| _c279| _c280| _c281| _c282| _c283| _c284| _c285| _c286| _c287| _c288| _c289| _c290| _c291| _c292| _c293| _c294|_c295| _c296| _c297| _c298| _c299| _c300| _c301| _c302|_c303| _c304| _c305| _c306| _c307| _c308|_c309| _c310| _c311| _c312|_c313| _c314|_c315| _c316|_c317| _c318| _c319| _c320| _c321| _c322| _c323| _c324| _c325| _c326| _c327| _c328| _c329| _c330| _c331| _c332| _c333| _c334|_c335| _c336| _c337| _c338| _c339| _c340| _c341| _c342| _c343| _c344| _c345| _c346| _c347| _c348| _c349| _c350| _c351| _c352| _c353| _c354| _c355| _c356| _c357| _c358| _c359| _c360|_c361| _c362|_c363| _c364|_c365| _c366| _c367| _c368| _c369| _c370| _c371| _c372| _c373| _c374| _c375| _c376|_c377| _c378| _c379| _c380| _c381| _c382| _c383| _c384| _c385| _c386| _c387| _c388| _c389| _c390| _c391| _c392| _c393| _c394| _c395| _c396| _c397| _c398| _c399| _c400| _c401| _c402| _c403| _c404| _c405| _c406| _c407| _c408| _c409| _c410|_c411| _c412|_c413| _c414|_c415| _c416| _c417| _c418| _c419| _c420| _c421| _c422| _c423| _c424| _c425| _c426|_c427| _c428| _c429| _c430| _c431| _c432| _c433| _c434| _c435| _c436| _c437| _c438| _c439| _c440| _c441| _c442| _c443| _c444| _c445| _c446| _c447| _c448| _c449| _c450| _c451| _c452| _c453| _c454| _c455| _c456| _c457| _c458| _c459| _c460|_c461| _c462|_c463| _c464|_c465| _c466| _c467| _c468| _c469| _c470| _c471| _c472| _c473| _c474| _c475| _c476|_c477| _c478| _c479| _c480| _c481| _c482| _c483| _c484| _c485| _c486| _c487| _c488| _c489| _c490| _c491| _c492| _c493| _c494| _c495| _c496| _c497| _c498| _c499| _c500| _c501| _c502| _c503| _c504| _c505| _c506| _c507| _c508| _c509| _c510|_c511| _c512|_c513| _c514|_c515| _c516| _c517| _c518| _c519| _c520| _c521| _c522| _c523| _c524| _c525| _c526|_c527| _c528| _c529| _c530| _c531| _c532| _c533| _c534| _c535| _c536| _c537| _c538| _c539| _c540| _c541| _c542| _c543| _c544| _c545| _c546| _c547| _c548| _c549| _c550| _c551| _c552| _c553| _c554| _c555| _c556| _c557| _c558| _c559| _c560|_c561| _c562| _c563| _c564|_c565| _c566| _c567| _c568| _c569| _c570| _c571| _c572|_c573| _c574| _c575| _c576|_c577| _c578|_c579| _c580| _c581| _c582| _c583| _c584| _c585| _c586| _c587| _c588| _c589| _c590| _c591| _c592| _c593| _c594| _c595| _c596|_c597| _c598| _c599| _c600| _c601| _c602| _c603| _c604| _c605| _c606| _c607| _c608| _c609| _c610| _c611| _c612| _c613| _c614| _c615| _c616| _c617| _c618| _c619| _c620|_c621| _c622|_c623| _c624| _c625| _c626| _c627| _c628| _c629| _c630| _c631| _c632| _c633| _c634| _c635| _c636| _c637| _c638| _c639| _c640|_c641| _c642|_c643| _c644| _c645| _c646| _c647| _c648| _c649| _c650| _c651| _c652| _c653| _c654| _c655| _c656| _c657| _c658| _c659| _c660|_c661| _c662|_c663| _c664| _c665| _c666| _c667| _c668| _c669| _c670| _c671| _c672| _c673| _c674| _c675| _c676| _c677| _c678| _c679| _c680| _c681| _c682| _c683| _c684| _c685| _c686| _c687| _c688| _c689| _c690| _c691| _c692| _c693| _c694| _c695| _c696|_c697| _c698| _c699| _c700| _c701|
+--------+-------+--------+------------+--------+------+--------+----------+-------...
|[('SMSG'| 'BKT')| ('SQNR'| '00000004')| ('STNQ'| '06')| ('TRNN'| '000001')| ('NRID'| ' ')| ('TREC'| '020')| ('TACN'| '001')| ('CARF'| ' ')| ('CSTF'| ' ...| ('RPSI'| 'SABR')| ('ESAC'| ' ')| ('DISI'| ' ')| ('NRMI'| ' ')| ('NRCT'| ' ')| ('AREI'| ' ')| ('RESD'| ' ...| ('SMSG'| 'BKS')| ('SQNR'| '00000005')| ('STNQ'| '24')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('CPUI'| 'FFFF')| ('CJCP'| ' ')| ('AGTN'| '20212146')| ('RFIC'| ' ')| ('TOUR'| ' ')| ('TRNC'| 'TKTT')| ('TODC'| 'CDGCDG ')| ('PNRR'| 'IKQOWZ/AA ')| ('TIIS'| '0000')| ('RESD'| ' ...| ('SMSG'| 'BKS')| ('SQNR'| '00000006')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 225.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'YR ')| ('TMFA_1'| 300.0)| ('TMFT_2'| 'FR ')| ('TMFA_2'| 20.81)| ('TMFT_3'| 'QX ')| ('TMFA_3'| 27.91)| ('TDAM'| 712.92)| ('RESD'| ' ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000007')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'IZ ')| ('TMFA_1'| 4.51)| ('TMFT_2'| 'YC ')| ('TMFA_2'| 9.22)| ('TMFT_3'| 'XY ')| ('TMFA_3'| 11.74)| ('TDAM'| 0.0)| ('RESD'| ' ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000008')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'XA ')| ('TMFA_1'| 6.64)| ('TMFT_2'| 'AY ')| ('TMFA_2'| 9.4)| ('TMFT_3'| 'WD ')| ('TMFA_3'| 29.33)| ('TDAM'| 0.0)| ('RESD'| ' ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000009')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'EK ')| ('TMFA_1'| 18.89)| ('TMFT_2'| 'EL ')| ('TMFA_2'| 4.19)| ('TMFT_3'| 'HG ')| ('TMFA_3'| 16.76)| ('TDAM'| 0.0)| ('RESD'| ' ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000010')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'JT ')| ('TMFA_1'| 2.52)| ('TMFT_2'| 'UC ')| ('TMFA_2'| 6.72)| ('TMFT_3'| 'QK ')| ('TMFA_3'| 16.76)| ('TDAM'| 0.0)| ('RESD'| ' ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000011')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'XF ')| ('TMFA_1'| 2.52)| ('TMFT_2'| 'XFCLT3 ')| ('TMFA_2'| 0.0)| ('TMFT_3'| ' ')| ('TMFA_3'| 0.0)| ('TDAM'| 0.0)| ('RESD'| ' ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000012')| ('STNQ'| '39')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('STAT'| 'I ')| ('COTP'| ' ')| ('CORT'| '00000')| ('COAM'| 0.0)| ('SPTP'| ' ')| ('SPRT'| '00000')| ('SPAM'| 0.0)| ('EFRT'| '00000')| ('EFCO'| 0.0)| ('APBC'| 0.0)| ('RDII'| ' ')| ('RESD'| ' ...| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000013')| ('STNQ'| '46')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('ORIT'| ' ')| ('ORIL'| ' ')| ('ORID'| ' ')| ('ORIA'| '00000000')| ('ENRS'| 'NONREF/RESTRICT...| ('RESD'| ' ')| ('SMSG'| 'BKI')| ('SQNR'| '00000014')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '1')| ('STPO'| 'X')| ('NBDA'| '22APR')| ('NADA'| '22APR')| ('ORAC'| 'CDG ')| ('DSTC'| 'MIA ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 63 ')| ('RBKD'| 'O ')| ('FTDA'| '22APR')| ('FTDT'| '1155 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3 ')| ('FFRF'| ' ...| ('FCPT'| ' ')| ('RESD'| ' ')| ('SMSG'| 'BKI')| ('SQNR'| '00000015')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '2')| ('STPO'| 'O')| ('NBDA'| '22APR')| ('NADA'| '22APR')| ('ORAC'| 'MIA ')| ('DSTC'| 'MBJ ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| '1515 ')| ('RBKD'| 'O ')| ('FTDA'| '22APR')| ('FTDT'| '1801 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3 ')| ('FFRF'| ' ...| ('FCPT'| ' ')| ('RESD'| ' ')| ('SMSG'| 'BKI')| ('SQNR'| '00000016')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '3')| ('STPO'| 'X')| ('NBDA'| '29APR')| ('NADA'| '29APR')| ('ORAC'| 'MBJ ')| ('DSTC'| 'CLT ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 844 ')| ('RBKD'| 'O ')| ('FTDA'| '29APR')| ('FTDT'| '1059 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3 ')| ('FFRF'| ' ...| ('FCPT'| ' ')| ('RESD'| ' ')| ('SMSG'| 'BKI')| ('SQNR'| '00000017')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '4')| ('STPO'| ' ')| ('NBDA'| '29APR')| ('NADA'| '29APR')| ('ORAC'| 'CLT ')| ('DSTC'| 'CDG ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 786 ')| ('RBKD'| 'O ')| ('FTDA'| '29APR')| ('FTDT'| '1630 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3 ')| ('FFRF'| ' ...| ('FCPT'| ' ')| ('RESD'| ' ')| ('SMSG'| 'BAR')| ('SQNR'| '00000018')| ('STNQ'| '64')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FARE'| 'EUR 225.00')| ('TKMI'| '/')| ('EQFR'| ' ')| ('TOTL'| 'EUR 712.92')| ('SASI'| '0011')| ('FCMI'| '0')| ('BAID'| ' ')| ('BEOT'| ' ')| ('FCPI'| '0')| ('AENT'| ' ')| ('RESD'| ' ...| ('SMSG'| 'BAR')| ('SQNR'| '00000019')| ('STNQ'| '65')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('PXNM'| ' ...| ('PXDA'| ' ...| ('DOBR'| '02APR68')| ('PXTP'| ' ')| ('RESD'| ' ')| ('SMSG'| 'BAR')| ('SQNR'| '00000020')| ('STNQ'| '66')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FPSN'| '1')| ('FPIN'| 'AA132193 ...| ('RESD'| ' ...| ('SMSG'| 'BKF')| ('SQNR'| '00000021')| ('STNQ'| '81')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FRCS'| '1')| ('FRCA'| 'PAR AA X/MIA AA...| ('RESD'| ' ')| ('SMSG'| 'BKF')| ('SQNR'| '00000022')| ('STNQ'| '81')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FRCS'| '2')| ('FRCA'| '1IZ9.22YC11.74X...| ('RESD'| ' ')| ('SMSG'| 'BKP')| ('SQNR'| '00000023')| ('STNQ'| '84')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('FPTP'| 'CA ')| ('FPAM'| 712.92)| ('FPAC'| ' ...| ('EXDA'| ' ')| ('EXPC'| ' ')| ('APLC'| ' ')| ('INVN'| ' ')| ('INVD'| '000000')| ('REMT'| 712.92)| ('CVVR'| ' ')| ('RESD'| ' ...| ('CUTP'| 'EUR2')]|
+--------+-------+--------+------------+--------+------+--------+----------+-------...
我想我仍然需要遍历每一对列,用第一列第一行的值重命名第二列,最后删除每对列的所有第一列。
或者是否可以添加更多选项:
df = spark.read.options(inferSchema="true").csv(rdd_trans)
要获得准确正确的数据帧结构?它将避免更多的处理时间(我的目标是比熊猫版本更快)
与此同时,我尝试这样做:
df.write.parquet("/Users/admin/Documents/Training/FR20180101HOT.parquet")
但出现错误:
Py4JJavaError: An error occurred while calling o447851.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8220.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8220.0 (TID 12712, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
...
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
...
由于文本限制,我无法输入所有错误消息,但这似乎与内存问题有关。
我数了一下df:
print(df.count())
15723
这等于我的 pandas 版本(其他不使用 pyspark 的 python 代码)中的行数,所以它得到了正确的行数。但是,在熊猫中,我可以毫无问题地提取实木复合地板。
【讨论】:
【参考方案3】:您可以针对您的情况尝试 regexp_replace。 查看下面的示例,
df1.withColumn("c0", regexp_replace("_c0", "[()']", "")).withColumn("c1", regexp_replace("_c1", "\)", "")).show()
+----+---+---+---+
| _c0|_c1| c0| c1|
+----+---+---+---+
|('a'| 2)| a| 2|
|('b'| 4)| b| 4|
|('c'| 6)| c| 6|
+----+---+---+---+
【讨论】:
以上是关于reduceByKey PySpark 中的列表列表的主要内容,如果未能解决你的问题,请参考以下文章
IndexError:在pyspark shell上使用reduceByKey操作时列出索引超出范围