如何在pyspark中更改DataFrame的hdfs块大小
Posted
技术标签:
【中文标题】如何在pyspark中更改DataFrame的hdfs块大小【英文标题】:How to change hdfs block size of DataFrame in pysark 【发布时间】:2018-03-14 13:55:46 【问题描述】:这似乎与
有关How to change hdfs block size in pyspark?
我可以用rdd.saveAsTextFile成功改变hdfs块大小, 但不是对应的DataFrame.write.parquet,无法以parquet格式保存。
不确定是pyspark DataFrame中的错误还是我没有正确设置配置。
以下是我的测试代码:
##########
# init
##########
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import hdfs
from hdfs import InsecureClient
import os
import numpy as np
import pandas as pd
import logging
os.environ['SPARK_HOME'] = '/opt/spark-2.2.1-bin-hadoop2.7'
block_size = 512 * 1024
conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max', 20).set("spark.executor.cores", 10).set("spark.executor.memory", "10g").set("spark.hadoop.dfs.blocksize", str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size))
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.blocksize", block_size)
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.block.size", block_size)
##########
# main
##########
# create DataFrame
df_txt = spark.createDataFrame([\'temp': "hello", \'temp': "world", \'temp': "!"])
# save using DataFrameWriter, resulting 128MB-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')
# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')
【问题讨论】:
AFAIK Spark SQL 在 2.0 中停止使用 Hadoop 配置 @user69 它如何从 HDFS 读取或使用 YARN? hadoop 配置在会话的上下文中 看起来这是特定于镶木地板的问题。我可以使用 df.write.csv() 和 df.write.text() apache-spark-developers-list.1001551.n3.nabble.com/… 成功保存 512k 块大小 【参考方案1】:从以下链接找到答案:
https://forums.databricks.com/questions/918/how-to-set-size-of-parquet-output-files.html
我可以使用 spark.hadoop.parquet.block.size 成功设置拼花块大小
以下是示例代码:
# init
block_size = 512 * 1024
conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max', 20).set("spark.executor.cores", 10).set("spark.executor.memory", "10g").set('spark.hadoop.parquet.block.size', str(block_size)).set("spark.hadoop.dfs.blocksize", str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size)).set("spark.hadoop.dfs.namenode.fs-limits.min-block-size", str(131072))
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
# create DataFrame
df_txt = spark.createDataFrame(['temp': "hello", 'temp': "world", 'temp': "!"])
# save using DataFrameWriter, resulting 512k-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')
# save using DataFrameWriter.csv, resulting 512k-block-size
df_txt.write.mode('overwrite').csv('hdfs://spark1/tmp/temp_with_df_csv')
# save using DataFrameWriter.text, resulting 512k-block-size
df_txt.write.mode('overwrite').text('hdfs://spark1/tmp/temp_with_df_text')
# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')
【讨论】:
【参考方案2】:Hadoop 和 Spark 是两个独立的工具,它们有自己的工作策略。 Spark 和 Parquet 使用数据分区,块大小对它们没有意义。按照 Spark 所说的去做,然后在 HDFS 中做你想做的事情。
您可以通过以下方式更改 Parquet 分区号
df_txt.repartition(6).format("parquet").save("hdfs://...")
【讨论】:
我想你的意思是 HDFS 在集群范围内有一个独立的配置,不是每个 Spark 应用程序都配置的以上是关于如何在pyspark中更改DataFrame的hdfs块大小的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?
如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame