reduceByKey PySpark 中的列表列表

Posted

技术标签:

【中文标题】reduceByKey PySpark 中的列表列表【英文标题】:reduceByKey a list of lists in PySpark 【发布时间】:2019-01-21 00:23:14 【问题描述】:

我是 pyspark 的新手,到目前为止,当您习惯使用 pandas 等库时,很难理解它的工作方式。但这似乎是大数据的必经之路。

对于我目前的 ETL 工作,我有以下要素:

这是我的rdd:

[
    [
    ('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
    ], 
    [
    ('SMSG', 'BKT'), ('SQNR', '00000024'), ('STNQ', '06'), ('TRNN', '000002'), ('NRID', '  '), ('TREC', '020'), ('TRNN', '000002'), ('NRID', '  '), ('TACN', '001'), ('CARF', '          '), ... 
    ],
    ...
]

行数据是一个固定大小的文本文件。

我现在要做的是对列表的每个单元格进行分组。

最终结果应该是:

[
    [
    ('SMSG_1', 'BKT'),('SMSG_2','BKS'),('SQNR_1', '00000004'),('SQNR_2', '00000005'),('STNQ_1','06'),('STNQ_2','24'),('TRNN', '000001'),()('DAIS', '171231'),...
    ],
    [
    ('SMSG', 'BKT'),('SQNR', '00000024'),('STNQ','06'),('TRNN', '000002'),('NRID', '  '), ('TREC', '020'), ('TACN', '001'), ('CARF', '          '),...
    ],
    ...
]

基本上规则如下:

1- 如果键相同且值也相同,则删除重复项。

2- 如果键相同而值不同,重命名列并添加后缀为“_Number”,其中 Number 可以替换为该键的迭代次数。

我的代码开始如下:

def addBKT():
...
def prepareTrans():
...
if __name__ == '__main__':
    input_folder = '/Users/admin/Documents/Training/FR20180101HOT' 
    rdd = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
    rdd = rdd.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
    print(rdd.take(1))

打印件给了我(如前所述)以下元组列表。我只取了 1 个子列表,但完整的 rdd 有大约 2000 个元组子列表:

[
    [
    ('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
    ]
]

我首先尝试将嵌套列表减少如下:

rdd = rdd.flatMap(lambda x:x).reduceByKey(list)

我期待结果是一个没有重复的新列表列表,对于具有不同值的元组,将它们全部分组在同一个键下。但是,我无法做到这一点。

作为第二步,我计划将具有多个值的元组转换为新的元组对,就像我在分组元组中获得的值一样:即 ('Key', ['Value1', 'Value2']) 变为 ( 'Key_1', 'Value1'),('Key_2', 'Value2')

最后,所有这些转换的输出是将最终的 RDD 转换为 DataFrame 并以 parquet 格式存储。

我真的希望过去有人做过类似的事情。我花了很多时间尝试这样做,但我无法做到,也无法在网上找到任何示例。

感谢您的帮助。

【问题讨论】:

【参考方案1】:

由于您是 Spark 新手,您可能不知道 Spark Dataframe。与 RDD 相比,Dataframe 是一个先进的概念。在这里,我使用 Pyspark Dataframe 解决了您的问题。看看这个,不要犹豫,学习 spark Dataframe。

rdd1 = sc.parallelize([("SMSG", "BKT"), ("SMSG", "BKT"), ("SMSG", "BKS"), ('SQNR', '00000004'), ('SQNR', '00000005') ])
rddToDF = rdd1.toDF(["C1", "C2"])
+----+--------+
|  C1|      C2|
+----+--------+
|SMSG|     BKT|
|SMSG|     BKT|
|SMSG|     BKS|
|SQNR|00000004|
|SQNR|00000005|
+----+--------+

DfRmDup = rddToDF.drop_duplicates() #Removing duplicates from Dataframe
DfRmDup.show()
+----+--------+
|  C1|      C2|
+----+--------+
|SQNR|00000004|
|SMSG|     BKT|
|SQNR|00000005|
|SMSG|     BKS|
+----+--------+

rank = DfRmDup.withColumn("rank", dense_rank().over(Window.partitionBy("C1").orderBy(asc("C2"))))
rank.show()
+----+--------+----+
|  C1|      C2|rank|
+----+--------+----+
|SQNR|00000004|   1|
|SQNR|00000005|   2|
|SMSG|     BKS|   1|
|SMSG|     BKT|   2|
+----+--------+----+

rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").show()
+------+--------+
|    C1|      C2|
+------+--------+
|SQNR_1|00000004|
|SQNR_2|00000005|
|SMSG_1|     BKS|
|SMSG_2|     BKT|
+------+--------+

#Converting back to RDD
rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").rdd.map(lambda x: (x[0],x[1])).collect()

[('SQNR_1', '00000004'),
 ('SQNR_2', '00000005'),
 ('SMSG_1', 'BKS'),
 ('SMSG_2', 'BKT')]

【讨论】:

嗨,Pradeep,非常感谢,它几乎解决了我的问题,我唯一缺少的部分是我有一个列表列表,每个列表都是我最终 DataFrame 中的一行。即:[[("SMSG", "BKT"), ('SQNR', '00000004')],[("SMSG", "BKS"), ('SQNR', '00000006')]] 应该是结果:+--------+--------+ |短信| SQNR| +--------+--------+ |BKT |00000004| |BKS |00000005| +--------+--------+ 我在创建 DF 时遇到的另一个问题是我的一些值不是 str。这是一个问题吗?因为我过去有一些错误,我认为 pyspark 期待一个模式来创建 DF。我应该给 toDF 一个包含每列类型的模式吗?如果是的话,我事先并不知道所有的专栏。每个列表可能有不同的列。在 pandas 中,我可以动态添加列。 @MehdiMansouri,试试他们在 ans 部分中提到的这种方式。 ***.com/questions/47259621/… 嗨,Pradeep,我尝试了链接中提供的解决方案。数据框创建成功,谢谢。【参考方案2】:

非常感谢您提供的链接,我遵循了提供的解决方案。数据框已成功创建,这很棒。

    input_folder = '/Users/admin/Documents/Training/FR20180101HOT' 
    rdd_split = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
    rdd_trans = rdd_split.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
    #rdd_group = rdd_trans.map(lambda x : x[i] for i in range(len(x))).reduceByKey(lambda x, y: str(x) + ','+ str(y))   
    df = spark.read.options(inferSchema="true").csv(rdd_trans)
    print(df.show(1)) 

印刷品向我展示了类似的东西:

+--------+-------+--------+------------+--------+------+--------+----------+----...

|     _c0|    _c1|     _c2|         _c3|     _c4|   _c5|     _c6|       _c7|     _c8|   _c9|    _c10|   _c11|    _c12|   _c13|    _c14|          _c15|    _c16|                _c17|    _c18|    _c19|    _c20|              _c21|    _c22| _c23|    _c24| _c25|    _c26| _c27|    _c28| _c29|    _c30|                _c31|    _c32|   _c33|    _c34|        _c35|    _c36|  _c37|    _c38|      _c39|    _c40|      _c41|    _c42|              _c43|    _c44| _c45|    _c46|    _c47|    _c48|   _c49|    _c50|        _c51|    _c52| _c53|    _c54|               _c55|    _c56|    _c57|    _c58|          _c59|    _c60|             _c61|    _c62|    _c63|    _c64|                _c65|    _c66|   _c67|    _c68|        _c69|    _c70|  _c71|    _c72|      _c73|    _c74|      _c75|    _c76|              _c77|    _c78| _c79|    _c80|   _c81|    _c82| _c83|      _c84|        _c85|      _c86|   _c87|      _c88|        _c89|      _c90|   _c91|      _c92|        _c93|      _c94|   _c95|    _c96|    _c97|    _c98|  _c99|   _c100|   _c101|   _c102|  _c103|   _c104|       _c105|   _c106| _c107|   _c108|     _c109|   _c110|     _c111|   _c112|             _c113|   _c114|_c115|   _c116|_c117|   _c118|_c119|     _c120|       _c121|     _c122| _c123|     _c124|       _c125|     _c126| _c127|     _c128|       _c129|     _c130|  _c131|   _c132|_c133|   _c134| _c135|   _c136|   _c137|   _c138|  _c139|   _c140|       _c141|   _c142| _c143|   _c144|     _c145|   _c146|     _c147|   _c148|             _c149|   _c150|_c151|   _c152|_c153|   _c154|_c155|     _c156|       _c157|     _c158| _c159|     _c160|       _c161|     _c162|_c163|     _c164|       _c165|     _c166|  _c167|   _c168|_c169|   _c170| _c171|   _c172|   _c173|   _c174|  _c175|   _c176|       _c177|   _c178| _c179|   _c180|     _c181|   _c182|     _c183|   _c184|             _c185|   _c186|_c187|   _c188|_c189|   _c190|_c191|     _c192|       _c193|     _c194|  _c195|     _c196|       _c197|     _c198| _c199|     _c200|       _c201|     _c202|  _c203|   _c204|_c205|   _c206| _c207|   _c208|   _c209|   _c210|  _c211|   _c212|       _c213|   _c214| _c215|   _c216|     _c217|   _c218|     _c219|   _c220|             _c221|   _c222|_c223|   _c224|_c225|   _c226|_c227|     _c228|       _c229|     _c230| _c231|     _c232|       _c233|     _c234| _c235|     _c236|       _c237|     _c238|  _c239|   _c240|_c241|   _c242| _c243|   _c244|   _c245|   _c246|  _c247|   _c248|       _c249|   _c250| _c251|   _c252|     _c253|   _c254|     _c255|   _c256|             _c257|   _c258|_c259|   _c260|_c261|   _c262|_c263|     _c264|       _c265|     _c266| _c267|     _c268|       _c269|     _c270|_c271|     _c272|       _c273|     _c274|_c275|   _c276|_c277|   _c278| _c279|   _c280|   _c281|   _c282|  _c283|   _c284|       _c285|   _c286| _c287|   _c288|     _c289|   _c290|     _c291|   _c292|             _c293|   _c294|_c295|   _c296|  _c297|   _c298|     _c299|   _c300|    _c301|   _c302|_c303|   _c304|     _c305|   _c306|    _c307|   _c308|_c309|   _c310|    _c311|   _c312|_c313|   _c314|_c315|   _c316|_c317|   _c318|               _c319|   _c320|   _c321|   _c322|  _c323|   _c324|       _c325|   _c326| _c327|   _c328|     _c329|   _c330|     _c331|   _c332|             _c333|   _c334|_c335|   _c336|             _c337|   _c338|  _c339|   _c340|      _c341|   _c342|       _c343|   _c344|               _c345|   _c346|              _c347|   _c348|  _c349|   _c350|       _c351|   _c352| _c353|   _c354|     _c355|   _c356|     _c357|   _c358|             _c359|   _c360|_c361|   _c362|_c363|   _c364|_c365|   _c366|    _c367|   _c368|    _c369|   _c370|    _c371|   _c372|    _c373|   _c374|  _c375|   _c376|_c377|   _c378|    _c379|   _c380| _c381|   _c382|    _c383|   _c384|    _c385|   _c386| _c387|   _c388|  _c389|   _c390|              _c391|   _c392|               _c393|   _c394|  _c395|   _c396|         _c397|   _c398|  _c399|   _c400|       _c401|   _c402| _c403|   _c404|     _c405|   _c406|     _c407|   _c408|             _c409|   _c410|_c411|   _c412|_c413|   _c414|_c415|   _c416|    _c417|   _c418|    _c419|   _c420|    _c421|   _c422|    _c423|   _c424|  _c425|   _c426|_c427|   _c428|    _c429|   _c430| _c431|   _c432|    _c433|   _c434|    _c435|   _c436| _c437|   _c438|  _c439|   _c440|              _c441|   _c442|               _c443|   _c444|  _c445|   _c446|         _c447|   _c448|  _c449|   _c450|       _c451|   _c452| _c453|   _c454|     _c455|   _c456|     _c457|   _c458|             _c459|   _c460|_c461|   _c462|_c463|   _c464|_c465|   _c466|    _c467|   _c468|    _c469|   _c470|    _c471|   _c472|    _c473|   _c474|  _c475|   _c476|_c477|   _c478|    _c479|   _c480| _c481|   _c482|    _c483|   _c484|    _c485|   _c486| _c487|   _c488|  _c489|   _c490|              _c491|   _c492|               _c493|   _c494|  _c495|   _c496|         _c497|   _c498|  _c499|   _c500|       _c501|   _c502| _c503|   _c504|     _c505|   _c506|     _c507|   _c508|             _c509|   _c510|_c511|   _c512|_c513|   _c514|_c515|   _c516|    _c517|   _c518|    _c519|   _c520|    _c521|   _c522|    _c523|   _c524|  _c525|   _c526|_c527|   _c528|    _c529|   _c530| _c531|   _c532|    _c533|   _c534|    _c535|   _c536| _c537|   _c538|  _c539|   _c540|              _c541|   _c542|               _c543|   _c544|  _c545|   _c546|         _c547|   _c548|  _c549|   _c550|       _c551|   _c552| _c553|   _c554|     _c555|   _c556|     _c557|   _c558|             _c559|   _c560|_c561|   _c562|           _c563|   _c564|_c565|   _c566|           _c567|   _c568|           _c569|   _c570|   _c571|   _c572|_c573|   _c574|     _c575|   _c576|_c577|   _c578|_c579|   _c580|       _c581|   _c582|               _c583|   _c584|  _c585|   _c586|       _c587|   _c588| _c589|   _c590|     _c591|   _c592|     _c593|   _c594|             _c595|   _c596|_c597|   _c598|               _c599|   _c600|               _c601|   _c602|      _c603|   _c604|  _c605|   _c606|       _c607|   _c608|  _c609|   _c610|       _c611|   _c612| _c613|   _c614|     _c615|   _c616|     _c617|   _c618|             _c619|   _c620|_c621|   _c622|_c623|   _c624|               _c625|   _c626|               _c627|   _c628|  _c629|   _c630|       _c631|   _c632| _c633|   _c634|     _c635|   _c636|     _c637|   _c638|             _c639|   _c640|_c641|   _c642|_c643|   _c644|               _c645|   _c646|       _c647|   _c648|  _c649|   _c650|       _c651|   _c652| _c653|   _c654|     _c655|   _c656|     _c657|   _c658|             _c659|   _c660|_c661|   _c662|_c663|   _c664|               _c665|   _c666|       _c667|   _c668|  _c669|   _c670|       _c671|   _c672| _c673|   _c674|     _c675|   _c676|     _c677|   _c678|         _c679|   _c680|   _c681|   _c682|               _c683|   _c684|   _c685|   _c686| _c687|   _c688|     _c689|   _c690|             _c691|   _c692|     _c693|   _c694|   _c695|   _c696|_c697|   _c698|               _c699|   _c700|    _c701|
+--------+-------+--------+------------+--------+------+--------+----------+-------...

|[('SMSG'| 'BKT')| ('SQNR'| '00000004')| ('STNQ'| '06')| ('TRNN'| '000001')| ('NRID'| '  ')| ('TREC'| '020')| ('TACN'| '001')| ('CARF'| '          ')| ('CSTF'| '               ...| ('RPSI'| 'SABR')| ('ESAC'| '              ')| ('DISI'| ' ')| ('NRMI'| ' ')| ('NRCT'| ' ')| ('AREI'| ' ')| ('RESD'| '               ...| ('SMSG'| 'BKS')| ('SQNR'| '00000005')| ('STNQ'| '24')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('CPUI'| 'FFFF')| ('CJCP'| '   ')| ('AGTN'| '20212146')| ('RFIC'| ' ')| ('TOUR'| '               ')| ('TRNC'| 'TKTT')| ('TODC'| 'CDGCDG    ')| ('PNRR'| 'IKQOWZ/AA    ')| ('TIIS'| '0000')| ('RESD'| '               ...| ('SMSG'| 'BKS')| ('SQNR'| '00000006')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 225.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'YR      ')| ('TMFA_1'| 300.0)| ('TMFT_2'| 'FR      ')| ('TMFA_2'| 20.81)| ('TMFT_3'| 'QX      ')| ('TMFA_3'| 27.91)| ('TDAM'| 712.92)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000007')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'IZ      ')| ('TMFA_1'| 4.51)| ('TMFT_2'| 'YC      ')| ('TMFA_2'| 9.22)| ('TMFT_3'| 'XY      ')| ('TMFA_3'| 11.74)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000008')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'XA      ')| ('TMFA_1'| 6.64)| ('TMFT_2'| 'AY      ')| ('TMFA_2'| 9.4)| ('TMFT_3'| 'WD      ')| ('TMFA_3'| 29.33)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000009')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'EK      ')| ('TMFA_1'| 18.89)| ('TMFT_2'| 'EL      ')| ('TMFA_2'| 4.19)| ('TMFT_3'| 'HG      ')| ('TMFA_3'| 16.76)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000010')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'JT      ')| ('TMFA_1'| 2.52)| ('TMFT_2'| 'UC      ')| ('TMFA_2'| 6.72)| ('TMFT_3'| 'QK      ')| ('TMFA_3'| 16.76)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000011')| ('STNQ'| '30')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('COBL'| 0.0)| ('NTFA'| 0.0)| ('TMFT_1'| 'XF      ')| ('TMFA_1'| 2.52)| ('TMFT_2'| 'XFCLT3  ')| ('TMFA_2'| 0.0)| ('TMFT_3'| '        ')| ('TMFA_3'| 0.0)| ('TDAM'| 0.0)| ('RESD'| '  ')| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000012')| ('STNQ'| '39')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('STAT'| 'I  ')| ('COTP'| '      ')| ('CORT'| '00000')| ('COAM'| 0.0)| ('SPTP'| '      ')| ('SPRT'| '00000')| ('SPAM'| 0.0)| ('EFRT'| '00000')| ('EFCO'| 0.0)| ('APBC'| 0.0)| ('RDII'| ' ')| ('RESD'| '               ...| ('CUTP'| 'EUR2')| ('SMSG'| 'BKS')| ('SQNR'| '00000013')| ('STNQ'| '46')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('ORIT'| '              ')| ('ORIL'| '   ')| ('ORID'| '       ')| ('ORIA'| '00000000')| ('ENRS'| 'NONREF/RESTRICT...| ('RESD'| '               ')| ('SMSG'| 'BKI')| ('SQNR'| '00000014')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '1')| ('STPO'| 'X')| ('NBDA'| '22APR')| ('NADA'| '22APR')| ('ORAC'| 'CDG  ')| ('DSTC'| 'MIA  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| '  63 ')| ('RBKD'| 'O ')| ('FTDA'| '22APR')| ('FTDT'| '1155 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BKI')| ('SQNR'| '00000015')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '2')| ('STPO'| 'O')| ('NBDA'| '22APR')| ('NADA'| '22APR')| ('ORAC'| 'MIA  ')| ('DSTC'| 'MBJ  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| '1515 ')| ('RBKD'| 'O ')| ('FTDA'| '22APR')| ('FTDT'| '1801 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BKI')| ('SQNR'| '00000016')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '3')| ('STPO'| 'X')| ('NBDA'| '29APR')| ('NADA'| '29APR')| ('ORAC'| 'MBJ  ')| ('DSTC'| 'CLT  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 844 ')| ('RBKD'| 'O ')| ('FTDA'| '29APR')| ('FTDT'| '1059 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BKI')| ('SQNR'| '00000017')| ('STNQ'| '63')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('SEGI'| '4')| ('STPO'| ' ')| ('NBDA'| '29APR')| ('NADA'| '29APR')| ('ORAC'| 'CLT  ')| ('DSTC'| 'CDG  ')| ('CARR'| 'AA ')| ('CABI'| ' ')| ('FTNR'| ' 786 ')| ('RBKD'| 'O ')| ('FTDA'| '29APR')| ('FTDT'| '1630 ')| ('FBST'| 'OK')| ('FBAL'| '1PC')| ('FBTD'| 'OLN0DMN3       ')| ('FFRF'| '               ...| ('FCPT'| '   ')| ('RESD'| '          ')| ('SMSG'| 'BAR')| ('SQNR'| '00000018')| ('STNQ'| '64')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FARE'| 'EUR   225.00')| ('TKMI'| '/')| ('EQFR'| '            ')| ('TOTL'| 'EUR   712.92')| ('SASI'| '0011')| ('FCMI'| '0')| ('BAID'| '      ')| ('BEOT'| ' ')| ('FCPI'| '0')| ('AENT'| '        ')| ('RESD'| '               ...| ('SMSG'| 'BAR')| ('SQNR'| '00000019')| ('STNQ'| '65')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('PXNM'| '               ...| ('PXDA'| '               ...| ('DOBR'| '02APR68')| ('PXTP'| '   ')| ('RESD'| '        ')| ('SMSG'| 'BAR')| ('SQNR'| '00000020')| ('STNQ'| '66')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FPSN'| '1')| ('FPIN'| 'AA132193       ...| ('RESD'| '               ...| ('SMSG'| 'BKF')| ('SQNR'| '00000021')| ('STNQ'| '81')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FRCS'| '1')| ('FRCA'| 'PAR AA X/MIA AA...| ('RESD'| '        ')| ('SMSG'| 'BKF')| ('SQNR'| '00000022')| ('STNQ'| '81')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('TDNR'| '0015131574285 ')| ('CDGT'| '2')| ('FRCS'| '2')| ('FRCA'| '1IZ9.22YC11.74X...| ('RESD'| '        ')| ('SMSG'| 'BKP')| ('SQNR'| '00000023')| ('STNQ'| '84')| ('DAIS'| '171231')| ('TRNN'| '000001')| ('FPTP'| 'CA        ')| ('FPAM'| 712.92)| ('FPAC'| '               ...| ('EXDA'| '    ')| ('EXPC'| '  ')| ('APLC'| '      ')| ('INVN'| '              ')| ('INVD'| '000000')| ('REMT'| 712.92)| ('CVVR'| ' ')| ('RESD'| '               ...| ('CUTP'| 'EUR2')]|
+--------+-------+--------+------------+--------+------+--------+----------+-------...

我想我仍然需要遍历每一对列,用第一列第一行的值重命名第二列,最后删除每对列的所有第一列。

或者是否可以添加更多选项:

df = spark.read.options(inferSchema="true").csv(rdd_trans)

要获得准确正确的数据帧结构?它将避免更多的处理时间(我的目标是比熊猫版本更快)

与此同时,我尝试这样做:

df.write.parquet("/Users/admin/Documents/Training/FR20180101HOT.parquet")

但出现错误:

Py4JJavaError: An error occurred while calling o447851.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8220.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8220.0 (TID 12712, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.

...
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
...

由于文本限制,我无法输入所有错误消息,但这似乎与内存问题有关。

我数了一下df:

print(df.count())

15723

这等于我的 pandas 版本(其他不使用 pyspark 的 python 代码)中的行数,所以它得到了正确的行数。但是,在熊猫中,我可以毫无问题地提取实木复合地板。

【讨论】:

【参考方案3】:

您可以针对您的情况尝试 regexp_replace。 查看下面的示例,

df1.withColumn("c0", regexp_replace("_c0", "[()']", "")).withColumn("c1", regexp_replace("_c1", "\)", "")).show()

+----+---+---+---+
| _c0|_c1| c0| c1|
+----+---+---+---+
|('a'| 2)|  a|  2|
|('b'| 4)|  b|  4|
|('c'| 6)|  c|  6|
+----+---+---+---+

【讨论】:

以上是关于reduceByKey PySpark 中的列表列表的主要内容,如果未能解决你的问题,请参考以下文章

IndexError:在pyspark shell上使用reduceByKey操作时列出索引超出范围

PySpark reduceByKey 对多个值

如何在 PySpark 中使用自定义行分组来 reduceByKey?

Pyspark:检查数组类型列是不是包含列表中的值[重复]

PySpark reduceByKey 只有一个键

如何在pyspark中使用reduceByKey作为多键和单值[重复]