从 Java 中的 spark 转换函数写入 HDFS 中的文件

Posted

技术标签:

【中文标题】从 Java 中的 spark 转换函数写入 HDFS 中的文件【英文标题】:Writing to files in HDFS from within spark transformation functions in Java 【发布时间】:2017-10-17 15:48:09 【问题描述】:

我的问题类似于 scala 中用于读取文件的已回答问题。

Reading files dynamically from HDFS from within spark transformation functions

我知道累加器使用它们将结果返回给驱动程序并写入驱动程序中的 HDFS。在我们的用例中,每个执行器的输出都很大,所以我正在寻找一种在 Java 转换中写入 HDFS 的方法。

谢谢!

【问题讨论】:

【参考方案1】:

终于找到了一种优雅的方式来实现这一点。为 hadoop 配置创建广播变量

Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration();
Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration));

将此广播变量传递给您的转换或操作,并使用以下代码 sn-p 获取 Hadoop 文件系统:

FileSystem fileSystem = FileSystem.get(bc.getValue().value());

如果其他人在同一条船上,希望这会有所帮助。

干杯!

【讨论】:

非常感谢,它就像一个魅力! :D 你节省了我的时间,非常感谢,继续分享!继续成长!【参考方案2】:
JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class,
        StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName));

JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>() 
    public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception 
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context());
        stringJavaPairRDD.saveAsTextFile("hdfs://");
        return stringJavaPairRDD;
    
);

【讨论】:

感谢张老师的回复,感谢您发布方法。但在我的情况下,中间数据不是 RDD,我们也不是流数据。 我终于从您的回答中挑选了一些想法,并能够得到解决方案。作为另一个答案发布...谢谢!

以上是关于从 Java 中的 spark 转换函数写入 HDFS 中的文件的主要内容,如果未能解决你的问题,请参考以下文章

将 Spark 数据集转换为 JSON 并写入 Kafka Producer

从spark写入elasticsearch非常慢

Spark中转换的失败处理

Spark 将数据帧直接从 Hive 写入本地文件系统

Spark 12 GB 数据加载与 Window 函数性能问题

无法从使用 mongo spark 连接器读取的 spark DF 中显示/写入。