使用 pySpark 读取分号数据的管道
Posted
技术标签:
【中文标题】使用 pySpark 读取分号数据的管道【英文标题】:Pipe with semicolon data read using pySpark 【发布时间】:2020-12-15 13:44:36 【问题描述】:从查找文件中读取文件以及每条记录的位置和国家、状态列 第 1 步:
for line into lines:
SourceDf = sqlContext.read.format("csv").option("delimiter","|").load(line)
SourceDf.withColumn("Location",lit("us"))\
.withColumn("Country",lit("Richmnd"))\
.withColumn("State",lit("NY"))
第 2 步: 从 DF 上方循环每一列并进行拆分操作,但在 KeyValueDf 中只得到两列。
for col_num in SopurceDf.column:
InterDF = pyspark.sql.fucntion.split(SourceDf[col_num],":")
KeyValueDF = SourceDf.withColumn("Column_Name",InterDF.get(0))\
.withColumn("Column_value",InterDf.get(1))
在第 1 步中:使用管道拆分数据并创建 60 列 在第 2 步中:我想再次用分号分割第 1 步的输出。
谁能帮我请教如何得到预期的结果。 .
*File format :
ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData"
ABC:"value1" |XYZ:"value2" |ZXC:"value3" |MNB:"value4"
ABC: "valueA" |XYZ:"ValueB" |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"
ABC:"value11" |XYZ:"value12" |ZXC:"value13" |MNB:"value14"
ABC:"value1A" |XYZ:"value2A" |ZXC:"value3A"
result:
ABC | XYZ |ZXC |MNB |POI
MobileData TabletData MacData WindowData
value1 value2 value3 value4
valueA valueB ValueC valueD valueE
value11 value12 value13 value14
value1A value2A value3A *
【问题讨论】:
请提供更多详细信息以及您为解决自己的问题所做的努力。 几乎我尝试了所有功能,但无法用分号分割数据。请帮助我。 可以多放几行数据吗?列名是重复的还是会有所不同? 列名重复,很少有不同的场景 90% 将只获取所有重复的列 【参考方案1】:我修改了here 给出的解决方案以获得您需要的输出。
我会这样做。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
import sys
sc = SparkContext('local')
sqlContext = SQLContext(sc)
input_file = "../data/datafile.csv"
initial_df = sqlContext.read.format("csv").csv(input_file)
initial_df.show(n=100, truncate=False)
inter_df = initial_df.withColumn("array", F.split(initial_df['_c0'], '\|'))
inter_df.show(n=100, truncate=False)
def create_dict(input_string):
result_list =
for ele in input_string:
internal_ele = ele.strip()
internal_ele = internal_ele.split(":")
internal_ele = [ele.strip() for ele in internal_ele]
result_list[internal_ele[0]] = internal_ele[1].replace('"', "")
return result_list
create_dict_udf = F.udf(create_dict, MapType(keyType=StringType(), valueType=StringType()))
inter_df = inter_df.withColumn("dictionary", create_dict_udf(F.col("array")))
inter_df.show(n=100, truncate=False)
keys_df = inter_df.select(F.explode(F.map_keys(F.col("dictionary")))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: F.col("dictionary").getItem(f).alias(str(f)), keys))
inter_df.select(key_cols).show()
输出如下。
+------------------------------------------------------------------------+
|_c0 |
+------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData" |
|ABC:"value1" |XYZ:"value2" |ZXC:"value3" |MNB:"value4" |
|ABC: "valueA" |XYZ:"ValueB" |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|
|ABC:"value11" |XYZ:"value12" |ZXC:"value13" |MNB:"value14" |
|ABC:"value1A" |XYZ:"value2A" |ZXC:"value3A" |
+------------------------------------------------------------------------+
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|_c0 |array |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData" |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:"WindowData"] |
|ABC:"value1" |XYZ:"value2" |ZXC:"value3" |MNB:"value4" |[ABC:"value1" , XYZ:"value2" , ZXC:"value3" , MNB:"value4"] |
|ABC: "valueA" |XYZ:"ValueB" |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA" , XYZ:"ValueB" , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|
|ABC:"value11" |XYZ:"value12" |ZXC:"value13" |MNB:"value14" |[ABC:"value11" , XYZ:"value12" , ZXC:"value13" , MNB:"value14"] |
|ABC:"value1A" |XYZ:"value2A" |ZXC:"value3A" |[ABC:"value1A" , XYZ:"value2A" , ZXC:"value3A"] |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|_c0 |array |dictionary |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:"WindowData" |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:"WindowData"] |[MNB -> WindowData, XYZ -> TableData, ABC -> MobileData, ZXC -> MacData] |
|ABC:"value1" |XYZ:"value2" |ZXC:"value3" |MNB:"value4" |[ABC:"value1" , XYZ:"value2" , ZXC:"value3" , MNB:"value4"] |[MNB -> value4, XYZ -> value2, ABC -> value1, ZXC -> value3] |
|ABC: "valueA" |XYZ:"ValueB" |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA" , XYZ:"ValueB" , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|[MNB -> valueD, XYZ -> ValueB, ABC -> valueA, POI -> valueE, ZXC -> valueC]|
|ABC:"value11" |XYZ:"value12" |ZXC:"value13" |MNB:"value14" |[ABC:"value11" , XYZ:"value12" , ZXC:"value13" , MNB:"value14"] |[MNB -> value14, XYZ -> value12, ABC -> value11, ZXC -> value13] |
|ABC:"value1A" |XYZ:"value2A" |ZXC:"value3A" |[ABC:"value1A" , XYZ:"value2A" , ZXC:"value3A"] |[XYZ -> value2A, ABC -> value1A, ZXC -> value3A] |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
+------+-------+----------+---------+----------+
| POI| ZXC| MNB| XYZ| ABC|
+------+-------+----------+---------+----------+
| null|MacData|WindowData|TableData|MobileData|
| null| value3| value4| value2| value1|
|valueE| valueC| valueD| ValueB| valueA|
| null|value13| value14| value12| value11|
| null|value3A| null| value2A| value1A|
+------+-------+----------+---------+----------+
【讨论】:
您可以将整行读取为一列(使用 spark.read.text)并使用内置函数str_to_map
创建 Map,只需为分隔符设置两个正则表达式,参见类似示例 @987654322 @
出现异常,例如列表索引超出范围。您能建议我如何修改代码吗???
你能放完整的堆栈跟踪吗?我认为您的数据可能格式不正确,您需要处理这些情况,例如键存在但值不存在。
更新了 create_dict 与 if else 条件基于拆分操作。现在它的工作。在这里,我如何从 inter_df 中选择特定列。尝试采用选定的列时失败。以上是关于使用 pySpark 读取分号数据的管道的主要内容,如果未能解决你的问题,请参考以下文章
如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行
YMatrix + PLPython替代Spark实现车联网算法