使用Apache Spark实现python功能
Posted
技术标签:
【中文标题】使用Apache Spark实现python功能【英文标题】:Use Apache Spark to implement the python function 【发布时间】:2015-12-19 06:28:12 【问题描述】:我有一个要在 Spark 中实现的 python 代码,但是我无法为在 Spark 1.1 版本中实现的 RDD 获得正确的逻辑。这段代码在 Python 中完美运行,但我想用这段代码在 Spark 中实现。
import lxml.etree
import csv
sc = SparkContext
data = sc.textFile("pain001.xml")
rdd = sc.parallelize(data)
# compile xpath selectors for ele ment text
selectors = ('GrpHdr/MsgId', 'GrpHdr/CreDtTm') # etc...
xpath = [lxml.etree.XPath('/text()'.format(s)) for s in selectors]
# open result csv file
with open('pain.csv', 'w') as paincsv:
writer = csv.writer(paincsv)
# read file with 1 'CstmrCdtTrfInitn' record per line
with open(rdd) as painxml:
# process each record
for index, line in enumerate(painxml):
if not line.strip(): # allow empty lines
continue
try:
# each line is an xml doc
pain001 = lxml.etree.fromstring(line)
# move to the customer elem
elem = pain001.find('CstmrCdtTrfInitn')
# select each value and write to csv
writer.writerow([xp(elem)[0].strip() for xp in xpath])
except Exception, e:
# give a hint where things go bad
sys.stderr.write("Error line , ".format(index, str(e)))
raise
I am getting error as RDD not iteratable
-
我想将此代码实现为函数并在 Spark 中实现为独立程序
我希望使用 python 模块在 HDFS 和 Spark 中的本地模式中处理输入文件。
感谢对问题的回应。
【问题讨论】:
【参考方案1】:您得到的错误信息非常丰富,当您执行with open(rdd) as painxml:
之后,您尝试在RDD
上使用iterate
,就好像它是python 中的普通List
或Tuple
一样,并且RDD
不是 iterable
,此外,如果您阅读 textFile 文档,您会注意到它返回一个 RDD
。
我认为您遇到的问题是您正试图以经典的方式实现这一目标,并且您必须在MapReduce
范式内处理它,如果您真的是Apache Spark
的新手,您可以审核这门课程@ 987654322@,此外,我建议您将 spark 的版本更新为 1.5 或 1.6(即将推出)。
仅作为一个小例子(但不使用 xmls):
导入所需文件
import re
import csv
读取输入文件
content = sc.textFile("../test")
content.collect()
# Out[8]: [u'1st record-1', u'2nd record-2', u'3rd record-3', u'4th record-4']
Map
RDD
操作每一行
# Map it and convert it to tuples
rdd = content.map(lambda s: tuple(re.split("-+",s)))
rdd.collect()
# Out[9]: [(u'1st record', u'1'),
# (u'2nd record', u'2'),
# (u'3rd record', u'3'),
# (u'4th record', u'4')]
写你的数据
with open("../test.csv", "w") as fw:
writer = csv.writer(fw)
for r1 in rdd.toLocalIterator():
writer.writerow(r1)
看看...
$ cat test.csv
1st record,1
2nd record,2
3rd record,3
4th record,4
注意:如果你想阅读xml
和Apache Spark
,GitHub 中有一些库,例如spark-xml;你也可以发现这个问题很有趣xml processing in spark。
【讨论】:
感谢 Alberto,但是您是否介意分享一个可以使用 python 在 spark 中实现的逻辑 @user5697452 我加了一个例子但是没有使用xml,但是路径在那里!以上是关于使用Apache Spark实现python功能的主要内容,如果未能解决你的问题,请参考以下文章
Apache Scala/Python Spark 2.4.4:按年份分组数据以生成/分析新功能