使用Apache Spark实现python功能

Posted

技术标签:

【中文标题】使用Apache Spark实现python功能【英文标题】:Use Apache Spark to implement the python function 【发布时间】:2015-12-19 06:28:12 【问题描述】:

我有一个要在 Spark 中实现的 python 代码,但是我无法为在 Spark 1.1 版本中实现的 RDD 获得正确的逻辑。这段代码在 Python 中完美运行,但我想用这段代码在 Spark 中实现。

import lxml.etree
import csv

sc = SparkContext
data = sc.textFile("pain001.xml")
rdd = sc.parallelize(data)
# compile xpath selectors for ele ment text
selectors = ('GrpHdr/MsgId', 'GrpHdr/CreDtTm') # etc...
xpath = [lxml.etree.XPath('/text()'.format(s)) for s in selectors]

# open result csv file
with open('pain.csv', 'w') as paincsv:
    writer = csv.writer(paincsv)
    # read file with 1 'CstmrCdtTrfInitn' record per line
    with open(rdd) as painxml:
        # process each record
        for index, line in enumerate(painxml):
            if not line.strip(): # allow empty lines
                continue
            try:
                # each line is an xml doc
                pain001 = lxml.etree.fromstring(line)
                # move to the customer elem
                elem = pain001.find('CstmrCdtTrfInitn')
                # select each value and write to csv
                writer.writerow([xp(elem)[0].strip() for xp in xpath])
            except Exception, e:
                # give a hint where things go bad
                sys.stderr.write("Error line , ".format(index, str(e)))
                raise  

I am getting error as RDD not iteratable
    我想将此代码实现为函数并在 Spark 中实现为独立程序 我希望使用 python 模块在 HDFS 和 Spark 中的本地模式中处理输入文件。

感谢对问题的回应。

【问题讨论】:

【参考方案1】:

您得到的错误信息非常丰富,当您执行with open(rdd) as painxml: 之后,您尝试在RDD 上使用iterate,就好像它是python 中的普通ListTuple 一样,并且RDD 不是 iterable,此外,如果您阅读 textFile 文档,您会注意到它返回一个 RDD

我认为您遇到的问题是您正试图以经典的方式实现这一目标,并且您必须在MapReduce 范式内处理它,如果您真的是Apache Spark 的新手,您可以审核这门课程@ 987654322@,此外,我建议您将 spark 的版本更新为 1.5 或 1.6(即将推出)。

仅作为一个小例子(但不使用 xmls):

    导入所需文件

    import re
    import csv
    

    读取输入文件

    content = sc.textFile("../test")
    content.collect()
    # Out[8]: [u'1st record-1', u'2nd record-2', u'3rd record-3', u'4th record-4']
    

    MapRDD 操作每一行

    # Map it and convert it to tuples
    rdd = content.map(lambda s: tuple(re.split("-+",s)))
    rdd.collect()
    # Out[9]: [(u'1st record', u'1'),
    #          (u'2nd record', u'2'),
    #          (u'3rd record', u'3'),
    #          (u'4th record', u'4')]
    

    写你的数据

    with open("../test.csv", "w") as fw:
        writer = csv.writer(fw)
    
        for r1 in rdd.toLocalIterator():
            writer.writerow(r1)
    

    看看...

    $ cat test.csv
    1st record,1
    2nd record,2
    3rd record,3
    4th record,4
    

注意:如果你想阅读xmlApache Spark,GitHub 中有一些库,例如spark-xml;你也可以发现这个问题很有趣xml processing in spark。

【讨论】:

感谢 Alberto,但是您是否介意分享一个可以使用 python 在 spark 中实现的逻辑 @user5697452 我加了一个例子但是没有使用xml,但是路径在那里!

以上是关于使用Apache Spark实现python功能的主要内容,如果未能解决你的问题,请参考以下文章

在Apache Spark中使用UDF

Apache Scala/Python Spark 2.4.4:按年份分组数据以生成/分析新功能

为啥 Apache Spark 的功能不并行?

Spark GraphX图计算代码实现,源码分析

Apache Spark (scala) + python/R 数据分析工作流程

Apache Spark 使用的 python 版本