Spark流DStream RDD以获取文件名

Posted

技术标签:

【中文标题】Spark流DStream RDD以获取文件名【英文标题】:Spark streaming DStream RDD to get file name 【发布时间】:2015-03-13 11:38:46 【问题描述】:

Spark streaming textFileStreamfileStream 可以监控目录并处理 Dstream RDD 中的新文件。

如何获取 DStream RDD 在特定时间间隔处理的文件名?

【问题讨论】:

与***.com/questions/29935732/…重复 @Irene 这是 2015 年 3 月发布的,该问题是 2015 年 4 月发布的。怎么可能重复?顺便说一句,其他问题也仍未得到解答。 呵呵,我看错了日期。另一个问题在评论中得到解答。 【参考方案1】:

fileStream 产生 UnionRDDNewHadoopRDDs。 sc.newAPIHadoopFile 创建的 NewHadoopRDDs 的好处在于,他们的 names 已设置为他们的路径。

以下是您可以利用这些知识做什么的示例:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
  ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
    .transform( rdd =>
      new UnionRDD(rdd.context,
        rdd.dependencies.map( dep =>
          dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
        )
      )
    )

def transformByFile[U: ClassTag](unionrdd: RDD[String],
                                 transformFunc: String => RDD[String] => RDD[U]): RDD[U] = 
  new UnionRDD(unionrdd.context,
    unionrdd.dependencies.map dep =>
      if (dep.rdd.isEmpty) None
      else 
        val filename = dep.rdd.name
        Some(
          transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
            .setName(filename)
        )
      
    .flatten
  )


def main(args: Array[String]) = 
  val conf = new SparkConf()
    .setAppName("Process by file")
    .setMaster("local[2]")

  val ssc = new StreamingContext(conf, Seconds(30))

  val dstream = namesTextFileStream(ssc, "/some/directory")

  def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
    rdd.map(line => (filename, line))

  val transformed = dstream.
    transform(rdd => transformByFile(rdd, byFileTransformer))

  // Do some stuff with transformed

  ssc.start()
  ssc.awaitTermination()

【讨论】:

谢谢!不过,val filename 令人困惑,因为它是文件的整个路径。 new File(dep.rdd.name).getName 似乎有效。【参考方案2】:

对于那些想要一些 Java 代码而不是 Scala 的人:

JavaPairInputDStream<LongWritable, Text> textFileStream = 
        jsc.fileStream(
            inputPath, 
            LongWritable.class, 
            Text.class,
            TextInputFormat.class, 
            FileInputDStream::defaultFilter,
            false
        );
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> 
        UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
        List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
        List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> 
            if (depRdd.isEmpty()) 
                return null;
            
            JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
            String filename = depRdd.name();
            JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
            return newDep.rdd();
        ).filter(t -> t != null).collect(Collectors.toList());
        Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
        ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
        return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
);

【讨论】:

【参考方案3】:

或者,通过修改 FileInputDStream 使其不再将文件的内容加载到 RDD 中,而是简单地从文件名创建一个 RDD。

如果您实际上不想将数据本身读入 RDD,或者想将文件名作为您的步骤之一传递给外部命令,这会提高性能。

只需更改 filesToRDD(..) 使其生成文件名的 RDD,而不是将数据加载到 RDD 中。

见:https://github.com/HASTE-project/bin-packing-paper/blob/master/spark/spark-scala-cellprofiler/src/main/scala/FileInputDStream2.scala#L278

【讨论】:

这里有一些关于应用程序的信息:arxiv.org/pdf/2001.10865

以上是关于Spark流DStream RDD以获取文件名的主要内容,如果未能解决你的问题,请参考以下文章

spark中要想保留流的状态怎么处理用哪种方式缓存

[Sparkhadoop]spark Streaming的核心DStream

深入理解Spark Streaming

如何保存 Spark Java Dstream RDD

如何从持续的 RDD 构造 DStream?

Spark DStream 转换