将 Spark Streaming 输出写入 HDFS 时跳过数据
Posted
技术标签:
【中文标题】将 Spark Streaming 输出写入 HDFS 时跳过数据【英文标题】:Data skipped while writing Spark Streaming output to HDFS 【发布时间】:2015-10-12 08:29:08 【问题描述】:我每 10 秒运行一次 Spark Streaming 应用程序,它的工作是使用来自 kafka 的数据,将其转换并根据密钥将其存储到 HDFS 中。即每个唯一键的文件。我正在使用 Hadoop 的 saveAsHadoopFile() API 来存储输出,我看到为每个唯一键生成一个文件,但问题是尽管 DStream 有更多行,但每个唯一键只存储一行相同的键。
例如,考虑以下具有一个唯一键的 DStream,
key value
===== =====================================
Key_1 183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
Key_1 184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
Key_1 181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0
Key_1 185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
Key_1 185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
我看到只有一行(而不是 5 行)存储在 HDFS 文件中,
185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
以下代码用于将输出存储到HDFS,
dStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void> ()
@Override
public Void call(JavaPairRDD<String, String> pairRDD) throws Exception
long timestamp = System.currentTimeMillis();
int randomInt = random.nextInt();
pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" + timestamp +"-"+ randomInt, String.class, String.class, RDDMultipleTextOutputFormat.class);
);
其中RDDMultipleTextOutputFormat的实现如下,
public class RDDMultipleTextOutputFormat<K,V> extends MultipleTextOutputFormat<K,V>
public K generateActualKey(K key, V value)
return null;
public String generateFileNameForKeyValue(K key, V value, String name)
return key.toString();
如果我遗漏了什么,请告诉我?感谢您的帮助。
【问题讨论】:
【参考方案1】:因为键是相同的,所以每次都会替换该值,因此您将获得提供给 hadoop 的最后一个值。
【讨论】:
我怎样才能让它不被覆盖? ...实际上我正在存储pairRDD,它必须存储整个RDD并且不应该覆盖。 您可以使用 rdd.zipWithUniqueId() 获取不同的密钥,然后将其转储到我们的 hdfs,而不是使用相同的密钥。试试看吧。 我的要求是将密钥的所有行存储在一个文件中。 rdd.zipWithUniqueId() 会做同样的事情吗? rdd.zipWithUniqueId() 用于生成唯一密钥,但您是否已经拥有密钥? 是的,我已经有了我的密钥,并且会有多个唯一密钥,并且每个密钥在每个批处理间隔中可能有 5 到 10 行。所以我需要将它们存储在相应的密钥文件中。以上是关于将 Spark Streaming 输出写入 HDFS 时跳过数据的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?
无法使用Spark Structured Streaming在Parquet文件中写入数据