激发整个纺织品 - 许多小文件

Posted

技术标签:

【中文标题】激发整个纺织品 - 许多小文件【英文标题】:spark whole textiles - many small files 【发布时间】:2017-05-21 18:29:43 【问题描述】:

我想通过 spark 摄取许多小文本文件到 parquet。目前,我使用wholeTextFiles 并额外执行一些解析。

更准确地说,这些小文本文件是 ESRi ASCII Grid 文件,每个文件的最大大小约为 400kb。 GeoTools 用于解析它们,如下所述。

您是否看到任何优化的可能性?也许可以避免创建不必要的对象?或者更好地处理小文件的东西。我想知道是否最好只获取文件的路径并手动读取它们而不是使用String -> ByteArrayInputStream

case class RawRecords(path: String, content: String)
case class GeometryId(idPath: String, value: Double, geo: String)
@transient lazy val extractor = new PolygonExtractionProcess()
@transient lazy val writer = new WKTWriter()

def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = 
    import spark.implicits._
    spark.sparkContext
      .wholeTextFiles(path, parallelism)
      .toDF("path", "content")
      .as[RawRecords]
      .mapPartitions(mapToSimpleTypes)
  

def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => 
    val extractor = new PolygonExtractionProcess()

    // http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html
    val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null)

    // TODO maybe consider optimization of known size instead of using growable data structure
    val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features
    val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]()

    while (vectorizedFeatures.hasNext) 
      val vectorizedFeature = vectorizedFeatures.next()
      val geomWKTLineString = vectorizedFeature.getDefaultGeometry match 
        case g: Geometry => writer.write(g)
      
      val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double]
      result += GeometryId(r.path, geomUserdata, geomWKTLineString)
    
    result
  )

【问题讨论】:

如果有帮助,请在此处查看我的答案,但同样是数据框:***.com/a/45227410/297113 【参考方案1】:

我有建议:

    使用wholeTextFile -> mapPartitions -> 转换为数据集。为什么?如果您在 Dataset 上创建 mapPartitions,则所有行都会从内部格式转换为对象 - 这会导致额外的序列化。 运行 Java Mission Control 并对您的应用程序进行采样。它将显示方法的所有编译和执行时间 也许你可以使用binaryFiles,它会给你Stream,所以你可以在mapPartitions中解析它而无需额外阅读

【讨论】:

不应该用钨压缩来弥补所有的物体问题吗?有没有像wholeBinaryFiles 这样的东西会给我路径和整个文件? @GeorgHeiler 你可以使用SparkContext.binaryFiles,它返回对FilePath,DataStream PortableDataStream 是什么类型的流?当使用`new ArcGridReader(r._2).read(null)`时,我得到一个ERROR arcgrid: Unsupported input type 。 github.com/geotools/geotools/blob/master/modules/plugin/arcgrid/… 应该支持任何常规输入流 编辑:它工作正常,我错过了在PortableDataStream 上对open 的呼叫。我现在要做一些分析。 我最终想要一个数据框转储到镶木地板。 ***.com/questions/29686573/… 正在使用 spark.read.text。您认为使用它会更好吗?但这又会导致许多 String 对象。

以上是关于激发整个纺织品 - 许多小文件的主要内容,如果未能解决你的问题,请参考以下文章

在 NTFS 上打开许多小文件太慢了

AWS cloudformation:一个大模板文件还是许多小模板文件?

从一个目录读取许多小文件有多大问题?

存储许多小文件(在 S3 上)?

为啥将许多小字节数组写入文件比写入一个大数组要快?

如何使用许多小文件加速 Spark 的 parquet 阅读器