如何从pyspark中的文件中匹配/提取多行模式

Posted

技术标签:

【中文标题】如何从pyspark中的文件中匹配/提取多行模式【英文标题】:How to match/extract multi-line pattern from file in pysark 【发布时间】:2019-08-14 17:58:11 【问题描述】:

我有一个巨大的 rdf 三元组(主谓宾)文件,如下图所示。它提取粗体项目并具有以下输出的目标

  Item_Id | quantityAmount | quantityUnit | rank
    -----------------------------------------------
      Q31      24954         Meter       BestRank
      Q25       582         Kilometer    NormalRank  

我想提取符合以下模式的行

给主题一个指针 (<Q31> <prop/P1082> <Pointer_Q31-87RF> .)

指针有排名(<Pointer_Q31-87RF> <rank> <BestRank>) 和 valuePointer (<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9>)

valuePointer 依次指向其 Amount (<value/cebcf9> <quantityAmount> "24954") 和 Unit (<value/cebcf9> <quantityUnit> <Meter>)

正常的方法是逐行读取文件并提取上述每个模式(使用 sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) ,然后通过不同的连接将它们组合起来这样它将提供上表。 有没有更好的方法来解决这个问题?我包括下面的文件示例。

<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954”
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .

【问题讨论】:

检查***.com/questions/31227363/…。例如,在您的示例数据中,您可以通过设置 delimiter = '\n\n' 以段落模式读取数据,以便在同一个 RDD 元素中读取所有相关行。 我的错,有\n\n,我这样做是为了提高可读性。 你能用\n&lt;Q作为分隔符吗? 订单不保证 【参考方案1】:

如果你可以使用\n&lt;Q作为分隔符来创建RDD元素,那么解析数据块就变成了一个纯python任务。下面我创建了一个函数(基于您的示例)来使用正则表达式解析块文本并将 cols 信息检索到 Row 对象中(您可能必须调整正则表达式以反映实际的数据模式,即区分大小写、额外的空格等) :

对于每个 RDD 元素,用“\n”(行模式)分割 然后对于每一行,由&gt; &lt; 拆分成一个列表y 我们可以通过检查y[1]y[2]找到rankquantityUnit,通过检查y[1,quantityAmount ]Item_id 通过检查 y[0]

通过迭代所有必填字段创建 Row 对象,将缺失字段的值设置为 None

from pyspark.sql import Row
import re

# skipped the code to initialize SparkSession

# field names to retrieve
cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']

def parse_rdd_element(x, cols):
    try:
        row = 
        for e in x.split('\n'):
            y = e.split('> <')
            if len(y) < 2:
                continue
            if y[1] in ['rank', 'quantityUnit']:
                row[y[1]] = y[2].split(">")[0]
            else:
                m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
                if m:
                    row['quantityAmount'] = m.group(1)
                    continue
                m = re.match('^(?:<Q)?(\d+)', y[0])
                if m:
                    row['Item_Id'] = 'Q' + m.group(1)
        # if row is not EMPTY, set None to missing field
        return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
    except:
        return None

使用带有\n&lt;Q 作为分隔符的newAPIHadoopFile() 设置RDD:

rdd = spark.sparkContext.newAPIHadoopFile(
    '/path/to/file',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf='textinputformat.record.delimiter': '\n<Q'
)

使用map函数将RDD元素解析为Row对象

rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]

将上面的RDD转成dataframe

df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit|      rank|
+-------+--------------+------------+----------+
|    Q31|         24954|       Meter|  BestRank|
|    Q25|           582|   Kilometer|NormalRank|
+-------+--------------+------------+----------+

一些注意事项:

为了获得更好的性能,在将所有正则表达式模式传递给 parse_rdd_element() 函数之前,请使用 re.compile() 预编译所有正则表达式模式。

如果\n&lt;Q之间可能有空格/制表符,多个块将被添加到同一个RDD元素中,只需将RDD元素用\n\s+&lt;Q分割,并将map()替换为@ 987654339@.

参考:creating spark data structure from multiline record

【讨论】:

我喜欢这种方法,但正如我之前在 cmets 中所说,我无法知道上面显示的顺序是否始终保留在大文件中。 嗨,@user1848018,order 是什么意思?您可以使用\n&lt;Q\n\s+&lt;Q 实际拆分文本文件并将相关行保留在同一个RDD 元素中吗?如果是这样,那么剩下的所有问题都只是解析文本。除非您可以添加所有典型的文本布局,否则我真的无能为力。顺便提一句。当前方法不依赖这些字段的顺序,只要它们在自己的行中即可。 按顺序我的意思是,如果 Q31> . @user1848018,我只看到了一个丢失的&lt;,这是什么意思?或者某些特定的子字符串必须在文本匹配中显示?顺便提一句。与&lt;Q31&gt; 相关的行是否可能与与&lt;Q25&gt; 相关的行混淆,如果这是事实,那么将需要一种完全不同的方法。 顺便说一句。只要您可以使用 \n&lt;Q 之类的分隔符将文本文件与 same 元素中的所有 相关文本 分开。例如,一个 RDD 元素可能包含多个文本块,即 ,它们可能来自缺少的 &lt; 或额外的空格。我们可以稍后使用 re.split() 拆分 RDD,然后运行 ​​flatMap()

以上是关于如何从pyspark中的文件中匹配/提取多行模式的主要内容,如果未能解决你的问题,请参考以下文章

如何使用模式匹配从 pyspark 数据框中删除行?

从 Pyspark 中的嵌套 Json-String 列中提取模式

如何在读取前根据定义的模式读取 pyspark 中的镶木地板文件?

从文件夹中的所有文本文件中提取与模式匹配的行到单个输出文件

在python中,如何通过匹配原始列表中的字符串模式从字符串列表中提取子列表

如何匹配模式并从多行字符串中删除整行