如何在pyspark中更改DataFrame的hdfs块大小

Posted

技术标签:

【中文标题】如何在pyspark中更改DataFrame的hdfs块大小【英文标题】:How to change hdfs block size of DataFrame in pysark 【发布时间】:2018-03-14 13:55:46 【问题描述】:

这似乎与

有关

How to change hdfs block size in pyspark?

我可以用rdd.saveAsTextFile成功改变hdfs块大小, 但不是对应的DataFrame.write.parquet,无法以parquet格式保存。

不确定是pyspark DataFrame中的错误还是我没有正确设置配置。

以下是我的测试代码:

##########
# init
##########
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import hdfs
from hdfs import InsecureClient
import os

import numpy as np
import pandas as pd
import logging

os.environ['SPARK_HOME'] = '/opt/spark-2.2.1-bin-hadoop2.7'

block_size = 512 * 1024

conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max', 20).set("spark.executor.cores", 10).set("spark.executor.memory", "10g").set("spark.hadoop.dfs.blocksize", str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size))

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.blocksize", block_size)
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.block.size", block_size)

##########
# main
##########

# create DataFrame
df_txt = spark.createDataFrame([\'temp': "hello", \'temp': "world", \'temp': "!"])

# save using DataFrameWriter, resulting 128MB-block-size

df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')

【问题讨论】:

AFAIK Spark SQL 在 2.0 中停止使用 Hadoop 配置 @user69 它如何从 HDFS 读取或使用 YARN? hadoop 配置在会话的上下文中 看起来这是特定于镶木地板的问题。我可以使用 df.write.csv() 和 df.write.text() apache-spark-developers-list.1001551.n3.nabble.com/… 成功保存 512k 块大小 【参考方案1】:

从以下链接找到答案:

https://forums.databricks.com/questions/918/how-to-set-size-of-parquet-output-files.html

我可以使用 spark.hadoop.parquet.block.size 成功设置拼花块大小

以下是示例代码:

# init
block_size = 512 * 1024 

conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max', 20).set("spark.executor.cores", 10).set("spark.executor.memory", "10g").set('spark.hadoop.parquet.block.size', str(block_size)).set("spark.hadoop.dfs.blocksize", str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size", str(131072))

sc = SparkContext(conf=conf) 
spark = SparkSession(sc) 

# create DataFrame 
df_txt = spark.createDataFrame(['temp': "hello", 'temp': "world", 'temp': "!"]) 

# save using DataFrameWriter, resulting 512k-block-size 

df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')

# save using DataFrameWriter.csv, resulting 512k-block-size 
df_txt.write.mode('overwrite').csv('hdfs://spark1/tmp/temp_with_df_csv') 

# save using DataFrameWriter.text, resulting 512k-block-size

df_txt.write.mode('overwrite').text('hdfs://spark1/tmp/temp_with_df_text')

# save using rdd, resulting 512k-block-size 
client = InsecureClient('http://spark1:50070') 
client.delete('/tmp/temp_with_rrd', recursive=True) 
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')

【讨论】:

【参考方案2】:

Hadoop 和 Spark 是两个独立的工具,它们有自己的工作策略。 Spark 和 Parquet 使用数据分区,块大小对它们没有意义。按照 Spark 所说的去做,然后在 HDFS 中做你想做的事情。

您可以通过以下方式更改 Parquet 分区号

df_txt.repartition(6).format("parquet").save("hdfs://...")

【讨论】:

我想你的意思是 HDFS 在集群范围内有一个独立的配置,不是每个 Spark 应用程序都配置的

以上是关于如何在pyspark中更改DataFrame的hdfs块大小的主要内容,如果未能解决你的问题,请参考以下文章

PySpark DataFrame:标记某些列值更改的行

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

在 Pyspark 中合并 DataFrame

如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe

如何在 pyspark 中验证 Dataframe 的架构(列的编号和名称)?