从 Java 中的 spark 转换函数写入 HDFS 中的文件
Posted
技术标签:
【中文标题】从 Java 中的 spark 转换函数写入 HDFS 中的文件【英文标题】:Writing to files in HDFS from within spark transformation functions in Java 【发布时间】:2017-10-17 15:48:09 【问题描述】:我的问题类似于 scala 中用于读取文件的已回答问题。
Reading files dynamically from HDFS from within spark transformation functions
我知道累加器使用它们将结果返回给驱动程序并写入驱动程序中的 HDFS。在我们的用例中,每个执行器的输出都很大,所以我正在寻找一种在 Java 转换中写入 HDFS 的方法。
谢谢!
【问题讨论】:
【参考方案1】:终于找到了一种优雅的方式来实现这一点。为 hadoop 配置创建广播变量
Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration();
Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration));
将此广播变量传递给您的转换或操作,并使用以下代码 sn-p 获取 Hadoop 文件系统:
FileSystem fileSystem = FileSystem.get(bc.getValue().value());
如果其他人在同一条船上,希望这会有所帮助。
干杯!
【讨论】:
非常感谢,它就像一个魅力! :D 你节省了我的时间,非常感谢,继续分享!继续成长!【参考方案2】:JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName));
JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>()
public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception
JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context());
stringJavaPairRDD.saveAsTextFile("hdfs://");
return stringJavaPairRDD;
);
【讨论】:
感谢张老师的回复,感谢您发布方法。但在我的情况下,中间数据不是 RDD,我们也不是流数据。 我终于从您的回答中挑选了一些想法,并能够得到解决方案。作为另一个答案发布...谢谢!以上是关于从 Java 中的 spark 转换函数写入 HDFS 中的文件的主要内容,如果未能解决你的问题,请参考以下文章
将 Spark 数据集转换为 JSON 并写入 Kafka Producer