即使在分区数据中,Spark 也会列出所有叶节点
Posted
技术标签:
【中文标题】即使在分区数据中,Spark 也会列出所有叶节点【英文标题】:Spark lists all leaf node even in partitioned data 【发布时间】:2017-01-23 14:54:43 【问题描述】:我有按date
和hour
划分的镶木地板数据,文件夹结构:
events_v3
-- event_date=2015-01-01
-- event_hour=2015-01-1
-- part10000.parquet.gz
-- event_date=2015-01-02
-- event_hour=5
-- part10000.parquet.gz
我已经通过 spark 创建了一个表 raw_events
,但是当我尝试查询时,它会扫描所有目录的页脚并减慢初始查询速度,即使我只查询一天的数据。
查询:
select * from raw_events where event_date='2016-01-01'
类似的问题:http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCAAswR-7Qbd2tdLSsO76zyw9tvs-Njw2YVd36bRfCG3DKZrH0tw@mail.gmail.com%3E(但它是旧的)
日志:
App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/
然后它产生 350 个任务,因为有 350 天的数据。
我禁用了schemaMerge
,并且还指定了要读取的架构,所以它可以直接转到我正在查看的分区,为什么要打印所有叶子文件?
列出2个executor的叶子文件需要10分钟,查询实际执行需要20秒
代码示例:
val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
"""
|select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
""".stripMargin).show()
【问题讨论】:
相关:Does Spark support Partition Pruning with Parquet Files 我根本没有使用 hive。只需火花和火花 sql @lostinoverflow 我仍然没有找到我们为什么要递归阅读,但我能够将 10 分钟的初始扫描缩短到 1 分钟的扫描。有效地将查询时间缩短到 2 分钟以内 @LostInOverflow spark 在我们尝试查询它时创建一个路径目录,它在内部递归地列出所有文件夹。它首先调用以获取文件夹列表,然后对每个文件夹再次进行查询,并以递归方式进行。这个过程在 s3 中非常慢。我将 spark 递归调用移至 s3 文件系统。我可以要求 s3 给所有带有前缀“events_v3/”的文件,从而有效地递归获取所有文件。在我的案例中,它将 48,000 个 api 调用减少到 300 个 api 调用。 @GauravShah 您能否将其发布为答案。如果没有更好的解决方案,我想奖励赏金。 【参考方案1】:只要给 spark 一个目录来读取它,就会调用listLeafFiles
(org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala)。这又会调用fs.listStatus
,它会调用 api 来获取文件和目录的列表。现在对于每个目录,再次调用此方法。这会递归地发生,直到没有留下任何目录。这种设计在 HDFS 系统中运行良好。但在 s3 中效果不佳,因为列表文件是 RPC 调用。其他的 S3 支持通过前缀获取所有文件,这正是我们所需要的。
因此,例如,如果我们有上面的目录结构,每个目录有 1 年的数据价值,每个目录小时和 10 个子目录,我们将有 365 * 24 * 10 = 87k api 调用,这可以减少到 138 api 调用给定只有 137000 个文件。每个 s3 api 调用返回 1000 个文件。
代码:
org/apache/hadoop/fs/s3a/S3AFileSystem.java
public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
IOException
String key = pathToKey(f);
if (LOG.isDebugEnabled())
LOG.debug("List status for path: " + f);
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory())
if (!key.isEmpty())
key = key + "/";
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(key);
request.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled())
LOG.debug("listStatus: doing listObjects for directory " + key);
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
while (true)
for (S3ObjectSummary summary : objects.getObjectSummaries())
Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
// Skip over keys that are ourselves and old S3N _$folder$ files
if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX))
if (LOG.isDebugEnabled())
LOG.debug("Ignoring: " + keyPath);
continue;
if (objectRepresentsDirectory(summary.getKey(), summary.getSize()))
result.add(new S3AFileStatus(true, true, keyPath));
if (LOG.isDebugEnabled())
LOG.debug("Adding: fd: " + keyPath);
else
result.add(new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath,
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
if (LOG.isDebugEnabled())
LOG.debug("Adding: fi: " + keyPath);
for (String prefix : objects.getCommonPrefixes())
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (keyPath.equals(f))
continue;
result.add(new S3AFileStatus(true, false, keyPath));
if (LOG.isDebugEnabled())
LOG.debug("Adding: rd: " + keyPath);
if (objects.isTruncated())
if (LOG.isDebugEnabled())
LOG.debug("listStatus: list truncated - getting next batch");
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
else
break;
else
if (LOG.isDebugEnabled())
LOG.debug("Adding: rd (not a dir): " + f);
result.add(fileStatus);
return result.toArray(new FileStatus[result.size()]);
/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] =
logTrace(s"Listing $status.getPath")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name))
Array.empty[FileStatus]
else
val statuses =
val stats = if(fs.isInstanceOf[S3AFileSystem])
logWarning("Using Monkey patched version of list status")
println("Using Monkey patched version of list status")
val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
a
// Array.empty[FileStatus]
else
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
// statuses do not have any dirs.
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map
case f: LocatedFileStatus => f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
【讨论】:
【参考方案2】:为了澄清 Gaurav 的回答,被剪断的代码来自 Hadoop 分支 2,可能要到 Hadoop 2.9 才会出现(参见 HADOOP-13208);并且有人需要更新 Spark 以使用该功能(这不会损害使用 HDFS 的代码,只是不会显示任何加速)。
需要考虑的一点是:什么是对象存储的良好文件布局。
不要有深度目录树,每个目录只有几个文件 有很多文件的浅树 考虑使用文件的前几个字符作为变化最大的值(例如天/小时),而不是最后一个。为什么?一些对象存储似乎使用前导字符进行散列,而不是尾随字符......如果你给你的名字更多的唯一性,那么它们就会分布在更多的服务器上,具有更好的带宽/更少的风险节流。 如果您使用的是 Hadoop 2.7 库,请切换到 s3a:// 而不是 s3n://。它已经更快了,而且每周都在进步,至少在 ASF 源代码树中是这样。最后,Apache Hadoop、Apache Spark 和相关项目都是开源的。欢迎投稿。这不仅仅是代码,它是文档、测试,并且,对于这个性能的东西,测试你的实际数据集。甚至向我们提供有关导致问题的原因(以及您的数据集布局)的详细信息也很有趣。
【讨论】:
他们已经将此修复移植到 2.8.0 中,这应该会在几周内发布 :) 不知道时间表;还没有人开始这个发布过程。我确实相信它是在 HDP-2.5 中发布的,如果它不工作,我会接到支持电话,我会打电话给支持电话。当 2.8 RC 流程开始时,测试将有所帮助。无论如何,Spark 并没有获得任何加速,因为它也需要进行调整,而且还有其他需要注意的地方。将数据放在更少的目录中,例如按月而不是按天,让您的生活更轻松以上是关于即使在分区数据中,Spark 也会列出所有叶节点的主要内容,如果未能解决你的问题,请参考以下文章