将行列表保存到 pyspark 中的 Hive 表

Posted

技术标签:

【中文标题】将行列表保存到 pyspark 中的 Hive 表【英文标题】:saving a list of rows to a Hive table in pyspark 【发布时间】:2016-04-27 23:30:02 【问题描述】:

我有一个 pyspark 应用程序。我将一个 hive 表复制到我的 hdfs 目录中,并且在 python 中我 sqlContext.sql 对该表进行了查询。现在这个变量是一个我称之为rows的数据框。我需要随机打乱rows,所以我必须将它们转换为行列表rows_list = rows.collect()。然后我shuffle(rows_list) 将列表重新排列到位。我取了我需要的随机行数量x

for r in range(x): allrows2add.append(rows_list[r]) 现在我想将 allrows2add 保存为一个 hive 表或附加一个现有的 hive 表(以更容易做的为准)。问题是我不能这样做:

all_df = sc.parallelize(allrows2add).toDF() 不能这样做,无法推断架构 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

没有放入整个架构。 rows 的架构有 117 列,所以我不想把它们打出来。有没有办法提取rows 的架构来帮助我制作 allrows2add 数据框或以某种方式保存为配置单元表? 我可以 rows.printSchema() 但不确定如何将其转换为模式格式作为变量传递 toDF() 而无需解析所有文本

谢谢

添加循环信息

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

【问题讨论】:

【参考方案1】:

当无法推断架构时,通常是有原因的。 toDFcreateDataFrame 函数的语法糖,默认情况下它只使用前 100 行(despite the docs 说它只使用第一行)来确定模式应该是什么。要改变这一点,您可以提高采样率以查看更大百分比的数据:

df = rdd.toDF(sampleRatio=0.2)
# or...
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)

您的随机样本也有可能碰巧只取某些特定列的空值行。如果是这种情况,您可以像这样create a schema from scratch:

from pyspark.sql.types import *
# all DataFrame rows are StructType
# can create a new StructType with combinations of StructField
schema = StructType([
    StructField("column_1", StringType(), True),
    StructField("column_2", IntegerType(), True),
    # etc.
])
df = sqlContext.createDataFrame(rdd, schema=schema)

或者,您可以通过访问 schema 值从之前创建的 DataFrame 中获取架构:

df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)

请注意,如果您的 RDD 的行不是 StructType(又名Row)对象而不是字典或列表,您将无法从它们创建数据框。如果您的 RDD 行是字典,您可以将它们转换为 Row 对象,如下所示:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x))
# ** is to unpack the dictionary since the Row constructor
# only takes keyword arguments

【讨论】:

非常感谢这个工作。我访问了schema 值。我想要解决的一件事是为什么这么慢(无论是将数据帧转换为行列表,还是简单的事情,例如写入镶木地板文件或尝试附加配置单元表) - 但这可能与我的系统与 api 本身。 如果不查看您的数据/代码,我无法确定。您的输入文件是否分成多个分区?如果是单个分区,则 Spark 不会并行加载。 我只是编辑了上面的原始帖子以显示更多信息。我对 Spark 很陌生,所以我不是 100% 确定,但我从配置单元表加载了我的输入(我从配置单元服务器复制到我的 hdfs 目录)。如果您有任何建议或可以向我指出资源(我也在 Scala 中尝试过,所以 Scala 代码会很好) - 那太好了!非常感谢 不幸的是,我不是为什么保存需要这么长时间。我建议您提出一个新问题,给出这三条信息:1)您的代码,2)数据框的行/列/分区数,以及 3)保存的总大小(GB 等)数据文件。祝你好运! 谢谢 - 我刚刚发布了here:

以上是关于将行列表保存到 pyspark 中的 Hive 表的主要内容,如果未能解决你的问题,请参考以下文章

Hive:通过 Hue 插入表格产生的文件数量与 pyspark 不同

找不到pyspark数据框保存到配置单元表

带有 hive 的 pyspark - 无法正确创建分区并从数据框中保存表

将 pyspark 中的数据框保存为 csv 中的 hivetable

Pyspark Dataframe:无法保存为 Hive 表

PySpark - 分区中覆盖的数据