从 Pyspark 在 HDFS 中保存文件

Posted

技术标签:

【中文标题】从 Pyspark 在 HDFS 中保存文件【英文标题】:Save a file in HDFS from Pyspark 【发布时间】:2017-06-10 01:32:28 【问题描述】:

我在Hive 中有一个空表,我的意思是该表中没有记录。

使用这个空表,我在pyspark 中创建了一个data frame

df = sqlContext.table("testing.123_test")

我已将此data frame 注册为

中的临时表
df.registerTempTable('mytempTable')

date=datetime.now().strftime('%Y-%m-%d %H:%M:%S')

在此表中,我有一个名为 id 的列。

现在我想像下面这样查询临时表

min_id = sqlContext.sql("select nvl(min(id),0) as minval from mytempTable").collect()[0].asDict()['minval']

max_id = sqlContext.sql("select nvl(max(id),0) as maxval from mytempTable").collect()[0].asDict()['maxval']

现在我想将datemin_idmax_id 保存到HDFS 中的文件中

我做了如下:

from pyspark.sql import functions as f

(sqlContext.table("myTempTable").select(f.concat_ws(",", f.first(f.lit(date)), f.min("id"), f.max("id"))).coalesce(1).write.format("text").mode("append").save("/tmp/fooo"))

现在,当我检查 HDFS 中的文件时,它会显示所有 NULL 值。

HDFS中的文件输出如下。

NULL,NULL,NULL

我想要的是

Date,0,0

Here date is the current timestamp

我怎样才能实现我想要的。

【问题讨论】:

我不明白你想在这里做什么。为什么要读取一个空表,然后将Date,0,0 写入 HDFS。能否请您详细说明一下? @philantrovert 在这里我试图将一些表的数据保存在HDFS 的目录中,如果表有记录,我可以这样做,但如果表是空的,那么我将面临以上场景 这很有趣。我知道来自***.com/a/44315328/3415409 的代码 不应保留空表。您只需要检查数据帧的计数。所以老实说,我看不出你在这里要做什么 @eliasah 你说的是对的,但是当我下次运行这个脚本时,这个表可能有数据。因此,如果我们有空表,它应该像我的要求一样使用HDFS 文件 【参考方案1】:

这是在 scala 中,但您应该能够轻松地将其复制到 Python。 这里需要的函数是na.fill函数。你必须在下面的代码中用 Python 字典替换 Scala Maps:

这就是你的 DF 的样子:

scala> nullDF.show
+----+----+----+
|date|   x|   y|
+----+----+----+
|null|null|null|
+----+----+----+

// You have already done this using Python's datetime functions
val format = new java.text.SimpleDateFormat("dd/MM/YYYY HH:mm:ss")
val curr_timestamp = format.format(new java.util.Date())

//Use na fill to replace null values
//Column names as keys in map
//And values are what you want to replace NULL with

val df = nullDF.na.fill(scala.collection.immutable.Map(
         "date" -> ) ,
         "x" -> "0" ,
         "y" -> "0" ) )

这应该给你

+-------------------+---+---+
|               date|  x|  y|
+-------------------+---+---+
|10/06/2017 12:10:20|  0|  0|
+-------------------+---+---+

【讨论】:

以上是关于从 Pyspark 在 HDFS 中保存文件的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 更改分隔符并将其保存为 HDFS 中的文本文件

将数据作为文本文件从 spark 保存到 hdfs

在 pyspark 中执行 NLTK

如何从 pyspark 数据框中更快地保存 csv 文件?

如何在 Hadoop 上运行 pySpark

如何在pyspark中更改DataFrame的hdfs块大小