在 PySpark 中使用拆分功能

Posted

技术标签:

【中文标题】在 PySpark 中使用拆分功能【英文标题】:Using split function in PySpark 【发布时间】:2018-05-03 21:06:52 【问题描述】:

我正在尝试从一个非常大的日志文件中搜索特定行。我可以搜索该行。

现在使用该行空间我想创建一个数据框,我无法做到这一点。我试过下面的代码,但无法实现。

from pyspark import SparkConf,SparkContext
from pyspark import  SQLContext
from pyspark.sql.types import *
from pyspark.sql import *

conf=SparkConf().setMaster("local").setAppName("invparsing")
sc=SparkContext(conf=conf)
sql=SQLContext(sc)
def f(x) :print(x)

data_frame_schema=StructType([
    StructField("Typeof",StringType()),
    #StructField("Produt_mod",StringType()),
    #StructField("Col2",StringType()),
    #StructField("Col3",StringType()),
    #StructField("Col4",StringType()),
    #StructField("Col5",StringType()),
])
path="C:/rk/IBMS/inv.log"

lines=sc.textFile(path)
NodeStr=lines.filter(lambda x:'Node :RBS6301' in x).map(lambda x:x.split(" +"))
NodeStr.foreach(f)
Nodedf=sql.createDataFrame(NodeStr,data_frame_schema)
Nodedf.show(truncate=False)

现在,我在这里得到输出 - 只有一个字符串。 O 想根据空间分割值。

[u'Node: RBS6301         XP10521/26 R30F L17A.4-6 (C17.0_LSV_PS4)']
+-------------------------------------------------------------+
|Typesof                                                      |  
+-------------------------------------------------------------+ 
|Node: RBS6301         XP10521/26   R30F   L17A.4-6   (C17.0_LSV_PS4)
+-------------------------------------------------------------+

预期输出:

Typeof      Produt_mod  Col2          Col3    Col4        COL5 
Node     RBS6301       XP10521/26    R30F    L17A.4-6    C17.0_LSV_PS4

【问题讨论】:

【参考方案1】:

你犯的第一个错误在这里:

lambda x:x.split(" +")

str.split 采用常量字符串而不是正则表达式。要在空格上拆分,您应该省略分隔符

lines = sc.parallelize(["Node: RBS6301         XP10521/26 R30F L17A.4-6 (C17.0_LSV_PS4)"])

lines.map(lambda s: s.split()).first()
# ['Node:', 'RBS6301', 'XP10521/26', 'R30F', 'L17A.4-6', '(C17.0_LSV_PS4)']

完成后,您可以过滤并转换为DataFrame

df = lines.map(lambda s: s.split()).filter(lambda x: len(x) == 6).toDF(
    ["col1", "col2", "col3", "col4", "col5", "col6"]
)
df.show()
# +-----+-------+----------+----+--------+---------------+
# | col1|   col2|      col3|col4|    col5|           col6|
# +-----+-------+----------+----+--------+---------------+
# |Node:|RBS6301|XP10521/26|R30F|L17A.4-6|(C17.0_LSV_PS4)|
# +-----+-------+----------+----+--------+---------------+

filter:

df[df["col2"] == "RBS6301"].show()
# +-----+-------+----------+----+--------+---------------+
# | col1|   col2|      col3|col4|    col5|           col6|
# +-----+-------+----------+----+--------+---------------+
# |Node:|RBS6301|XP10521/26|R30F|L17A.4-6|(C17.0_LSV_PS4)|
# +-----+-------+----------+----+--------+---------------+

【讨论】:

这只是为了重现。如果您的文件只包含那一行,textFile 的结果将是相同的。 @user9613381 我通过使用:'NodeStr=lines.filter(lambda x:'Node :RBS6301' in x).map(lambda x:x.split( " +"))' 之后告诉我如何使用您的代码,或者有另一种方法可以在 pyspark 中搜索特定字符串并从中创建数据框 .... 请再次阅读我的回答。 lambda x:x.split(" +") 完全是错误的部分 :) 现在使用这个我得到了我预期的行:NodeStr = lines.filter(lambda x: 'Node: RBS6301' in x)...现在告诉我先生我将如何走得更远...如果我将下一行放在这个 Test_lines = sc.parallelize(NodeStr) 它不起作用..我的问题是我有一个日志文件,我想从中选择一个特定的行,并且我想用该行创建数据框.. .

以上是关于在 PySpark 中使用拆分功能的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:文件文本拆分后无法打印

在 PySpark 数据框中拆分字符串

在点处拆分 PySpark 数据框列

如何在 PySpark 中拆分数据框列

如何使用逗号分隔值拆分列并存储在 PySpark Dataframe 中的数组中?如下所示

如何将向量拆分为列 - 使用 PySpark