将 Spark Streaming 输出写入 HDFS 时跳过数据

Posted

技术标签:

【中文标题】将 Spark Streaming 输出写入 HDFS 时跳过数据【英文标题】:Data skipped while writing Spark Streaming output to HDFS 【发布时间】:2015-10-12 08:29:08 【问题描述】:

我每 10 秒运行一次 Spark Streaming 应用程序,它的工作是使用来自 kafka 的数据,将其转换并根据密钥将其存储到 HDFS 中。即每个唯一键的文件。我正在使用 Hadoop 的 saveAsHadoopFile() API 来存储输出,我看到为每个唯一键生成一个文件,但问题是尽管 DStream 有更多行,但每个唯一键只存储一行相同的键。

例如,考虑以下具有一个唯一键的 DStream,

  key                  value
 =====   =====================================
 Key_1   183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 

我看到只有一行(而不是 5 行)存储在 HDFS 文件中,

185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0

以下代码用于将输出存储到HDFS,

dStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void> () 
    @Override
    public Void call(JavaPairRDD<String, String> pairRDD) throws Exception 
        long timestamp = System.currentTimeMillis();
        int randomInt = random.nextInt();
        pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" + timestamp +"-"+ randomInt, String.class, String.class, RDDMultipleTextOutputFormat.class);
    
);

其中RDDMultipleTextOutputFormat的实现如下,

public class RDDMultipleTextOutputFormat<K,V> extends    MultipleTextOutputFormat<K,V> 

    public K generateActualKey(K key, V value)  
        return null;
    

    public String generateFileNameForKeyValue(K key, V value, String name)  
        return key.toString();
    

如果我遗漏了什么,请告诉我?感谢您的帮助。

【问题讨论】:

【参考方案1】:

因为键是相同的,所以每次都会替换该值,因此您将获得提供给 hadoop 的最后一个值。

【讨论】:

我怎样才能让它不被覆盖? ...实际上我正在存储pairRDD,它必须存储整个RDD并且不应该覆盖。 您可以使用 rdd.zipWithUniqueId() 获取不同的密钥,然后将其转储到我们的 hdfs,而不是使用相同的密钥。试试看吧。 我的要求是将密钥的所有行存储在一个文件中。 rdd.zipWithUniqueId() 会做同样的事情吗? rdd.zipWithUniqueId() 用于生成唯一密钥,但您是否已经拥有密钥? 是的,我已经有了我的密钥,并且会有多个唯一密钥,并且每个密钥在每个批处理间隔中可能有 5 到 10 行。所以我需要将它们存储在相应的密钥文件中。

以上是关于将 Spark Streaming 输出写入 HDFS 时跳过数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

Spark Streaming“回声”读取和写入套接字

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

混合 Spark Structured Streaming API 和 DStream 写入 Kafka

Spark Streaming:读取和写入状态信息到外部数据库,如 cassandra