pyspark 从逗号分隔值列表中创建多行

Posted

技术标签:

【中文标题】pyspark 从逗号分隔值列表中创建多行【英文标题】:pyspark create multiple rows from a list of comma separated values 【发布时间】:2018-05-22 02:05:36 【问题描述】:

在pyspark sqlcontext sql中,已经编写了获取文本然后重新格式化的代码 但是这样的事情是个问题

在数据框中有这样的东西 代码在哪里

hash_tags_fun = udf(lambda t: re.findall('(#[^#]\w3,)', t))

hash_tags_in_tweets_df.registerTempTable("hash_tags_table")
hash_tags_result = sqlContext.sql("SELECT text FROM hash_tags_table")
hash_tags_list = hash_tags_result.withColumn('text', hash_tags_fun('text'))
hash_tags_list.show(3)
+-------------------+
|               text|
+-------------------+
|  [#shutUpAndDANCE]|
|  [#SHINee, #AMBER]|
|[#JR50, #flipagram]|
+-------------------+

我需要类似的东西

+-------------------+
|               text|
+-------------------+
|    #shutUpAndDANCE|
|            #SHINee|
|             #AMBER|
|              #JR50|
|         #flipagram|
+-------------------+
hash_tags_list.withColumn("text", explode("text")) has given an error saying

AnalysisException: u"cannot resolve 'explode(text)' due to data type 不匹配:函数 explode 的输入应该是数组或映射类型,而不是 细绳;; \n'项目 [explode(text#24) AS text#68]\n+- 分析障碍\n +- 项目 [(text#9) AS text#24]\n +- 项目 [text#9]\n +- 子查询别名 hash_tags_table\n +- 项目 [text#9]\n +- 过滤文本#9 LIKE %#%\n +- 子查询别名 twt\n +- SubqueryAlias 推文\n +- 关系[country#6,id#7,place#8,text#9,user#10] json\n"

【问题讨论】:

查看此链接:***.com/questions/50378312/… Explode in PySpark的可能重复 你有一个字符串列。您将不得不删除括号,然后以逗号分隔。然后你就可以爆发了。您可以使用pyspark.sql.functions.regexp_replace()pyspark.sql.functions.split() 【参考方案1】:

扩展至my comment:

您的列看起来像一个数组,但它实际上是一个字符串 - 这就是您对 explode() 的调用不起作用的原因。您必须先将列转换为数组。

这将涉及删除前导和尾随方括号并拆分逗号字符。

先去掉前后括号,可以使用pyspark.sql.functions.regexp_replace()

from pyspark.sql.functions import regexp_replace, split
df = hash_tags_list.select(regexp_replace("text", r"(^\[)|(\]$)", "").alias("text"))
df.show()
#+-----------------+
#|             text|
#+-----------------+
#|  #shutUpAndDANCE|
#|  #SHINee, #AMBER|
#|#JR50, #flipagram|
#+-----------------+

现在以逗号分隔,后跟一个空格:

df = df.select(split("text", ", ").alias("text"))
df.show()
#+-------------------+
#|               text|
#+-------------------+
#|  [#shutUpAndDANCE]|
#|  [#SHINee, #AMBER]|
#|[#JR50, #flipagram]|
#+-------------------+

你会注意到这个打印和你开始的一模一样,但是当我们检查架构时,我们发现这些实际上是字符串数组:

df.printSchema()
#root
# |-- text: array (nullable = true)
# |    |-- element: string (containsNull = true)

将此与您原始 DataFrame 的架构进行比较:

hash_tags_list.printSchema()
#root
# |-- text: string (nullable = true)

将数据作为数组,现在可以调用explode()

from pyspark.sql.functions import explode
df = df.select(explode("text").alias("hashtags"))
df.show()
#+---------------+
#|       hashtags|
#+---------------+
#|#shutUpAndDANCE|
#|        #SHINee|
#|         #AMBER|
#|          #JR50|
#|     #flipagram|
#+---------------+

【讨论】:

以上是关于pyspark 从逗号分隔值列表中创建多行的主要内容,如果未能解决你的问题,请参考以下文章

你将如何从字符串列表中创建一个逗号分隔的字符串?

Python:试图从一个寻找可被 3 整除的数字的 for 循环中创建一个逗号分隔的列表

Python:从查询中创建逗号分隔的字符串

多行到一个唯一的逗号分隔值[重复]

Google表格中的笛卡尔/交叉连接以获取逗号分隔值

从列表中创建一个 pyspark 数据框列,其中列表的长度与数据框的行数相同