使用 pySpark 读取分号数据的管道

Posted

技术标签:

【中文标题】使用 pySpark 读取分号数据的管道【英文标题】:Pipe with semicolon data read using pySpark 【发布时间】:2020-12-15 13:44:36 【问题描述】:

从查找文件中读取文件以及每条记录的位置和国家、状态列 第 1 步:

for line into lines:
    SourceDf = sqlContext.read.format("csv").option("delimiter","|").load(line)
    SourceDf.withColumn("Location",lit("us"))\
    .withColumn("Country",lit("Richmnd"))\
    .withColumn("State",lit("NY"))

第 2 步: 从 DF 上方循环每一列并进行拆分操作,但在 KeyValueDf 中只得到两列。

for col_num in SopurceDf.column:
  InterDF = pyspark.sql.fucntion.split(SourceDf[col_num],":")
  KeyValueDF = SourceDf.withColumn("Column_Name",InterDF.get(0))\
               .withColumn("Column_value",InterDf.get(1))

在第 1 步中:使用管道拆分数据并创建 60 列 在第 2 步中:我想再次用分号分割第 1 步的输出。

谁能帮我请教如何得到预期的结果。 .

*File format :
ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData"
ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"
ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"
ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"
ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"

result:
    ABC       | XYZ       |ZXC    |MNB       |POI
    MobileData  TabletData MacData WindowData
    value1      value2     value3  value4    
    valueA      valueB     ValueC  valueD    valueE
    value11     value12    value13 value14    
    value1A      value2A   value3A *
    

【问题讨论】:

请提供更多详细信息以及您为解决自己的问题所做的努力。 几乎我尝试了所有功能,但无法用分号分割数据。请帮助我。 可以多放几行数据吗?列名是重复的还是会有所不同? 列名重复,很少有不同的场景 90% 将只获取所有重复的列 【参考方案1】:

我修改了here 给出的解决方案以获得您需要的输出。

我会这样做。

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
import sys

sc = SparkContext('local')
sqlContext = SQLContext(sc)


input_file = "../data/datafile.csv"

initial_df = sqlContext.read.format("csv").csv(input_file)

initial_df.show(n=100, truncate=False)

inter_df = initial_df.withColumn("array", F.split(initial_df['_c0'], '\|'))

inter_df.show(n=100, truncate=False)


def create_dict(input_string):
    result_list = 
    for ele in input_string:
        internal_ele = ele.strip()
        internal_ele = internal_ele.split(":")
        internal_ele = [ele.strip() for ele in internal_ele]
        result_list[internal_ele[0]] = internal_ele[1].replace('"', "")
    return result_list

create_dict_udf = F.udf(create_dict, MapType(keyType=StringType(), valueType=StringType()))


inter_df = inter_df.withColumn("dictionary", create_dict_udf(F.col("array")))

inter_df.show(n=100, truncate=False)

keys_df = inter_df.select(F.explode(F.map_keys(F.col("dictionary")))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: F.col("dictionary").getItem(f).alias(str(f)), keys))

inter_df.select(key_cols).show()

输出如下。

+------------------------------------------------------------------------+
|_c0                                                                     |
+------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData"         |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |
+------------------------------------------------------------------------+

+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|_c0                                                                     |array                                                                         |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData"         |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:"WindowData"]          |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+

+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|_c0                                                                     |array                                                                         |dictionary                                                                 |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData"         |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:"WindowData"]          |[MNB -> WindowData, XYZ -> TableData, ABC -> MobileData, ZXC -> MacData]   |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |[MNB -> value4, XYZ -> value2, ABC -> value1, ZXC -> value3]               |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|[MNB -> valueD, XYZ -> ValueB, ABC -> valueA, POI -> valueE, ZXC -> valueC]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |[MNB -> value14, XYZ -> value12, ABC -> value11, ZXC -> value13]           |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |[XYZ -> value2A, ABC -> value1A, ZXC -> value3A]                           |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+

+------+-------+----------+---------+----------+
|   POI|    ZXC|       MNB|      XYZ|       ABC|
+------+-------+----------+---------+----------+
|  null|MacData|WindowData|TableData|MobileData|
|  null| value3|    value4|   value2|    value1|
|valueE| valueC|    valueD|   ValueB|    valueA|
|  null|value13|   value14|  value12|   value11|
|  null|value3A|      null|  value2A|   value1A|
+------+-------+----------+---------+----------+

【讨论】:

您可以将整行读取为一列(使用 spark.read.text)并使用内置函数 str_to_map 创建 Map,只需为分隔符设置两个正则表达式,参见类似示例 @987654322 @ 出现异常,例如列表索引超出范围。您能建议我如何修改代码吗??? 你能放完整的堆栈跟踪吗?我认为您的数据可能格式不正确,您需要处理这些情况,例如键存在但值不存在。 更新了 create_dict 与 if else 条件基于拆分操作。现在它的工作。在这里,我如何从 inter_df 中选择特定列。尝试采用选定的列时失败。

以上是关于使用 pySpark 读取分号数据的管道的主要内容,如果未能解决你的问题,请参考以下文章

如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行

在 pandas 数据帧上应用 Pyspark 管道

研究 RDD-pyspark 的不同元素

YMatrix + PLPython替代Spark实现车联网算法

为 pyspark 数据帧的每一行评估多个 if elif 条件

Pyspark 命令无法识别(Ubuntu)