如何使用 pyspark 读取 hdfs kafka 数据?

Posted

技术标签:

【中文标题】如何使用 pyspark 读取 hdfs kafka 数据?【英文标题】:How to read hdfs kafka data using pyspark? 【发布时间】:2018-01-22 17:35:08 【问题描述】:

我正在尝试读取通过 Kafka 和 SparkStreaming 获取的存储到 HDFS 的数据。

我正在使用一个 Java 应用程序,它使用 JavaRDD.saveAsTextFile 方法将一些任意数据保存到 Hadoop HDFS。基本上是这样的:

kafkaStreams.get(i).foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() 
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception 
                        consumerRecordJavaRDD.saveAsTextFile("/tmp/abcd_" + System.currentTimeMillis());
            );

通过 Kafka 推送文本文件行。数据已保存,我可以在 localhost:50070 的默认 hadoop 浏览器中看到它。

然后,在 pyspark 应用程序中,我尝试使用 sparkContext.textFile 读取数据。

问题是我读取的数据(使用 python 或在 localhost:50070 上“手动”)也包含元数据。所以每一行如下(一长串):

"ConsumerRecord(topic = abcdef, partition = 0, offset = 3, CreateTime = 123456789, checksum = 987654321, 序列化键大小 = -1, 序列化值大小 = 28, key = null, value = aaaa, bbbb, cccc, dddd, eeee)"

我想按原样读取数据然后拆分和解析长字符串以获取“值”内容是没有意义的。

那我应该如何解决这个问题呢?是否可以只读取“值”字段?还是储蓄本身的问题?

【问题讨论】:

像这样保存数据使得任何下游进程几乎不可能在没有一些预处理的情况下使用数据。我强烈建议您在将数据写入 HDFS 之前对其进行解析。如果您只想读取数据的单个“列”,那么我建议您以列格式将数据保存到 HDFS,例如镶木地板。如果您将 Kafka 流保存为 textFile,那么您几乎可以保证必须将数据作为字符串读取并解析为您要查找的内容。 感谢您的评论。不,最后我想保存一个包含 20 多列的非常大的 .csv 文件。 最好使用 Avro 或 Parquet,或者至少使用 JSON。纯文本很难查询...如果您使用 Confluent 的 Kafka HDFS 连接器,您将获得开箱即用的 Hive 集成。 docs.confluent.io/current/connect/connect-hdfs/docs/index.html 感谢 cricket_007,我们确实会使用 Parquet。 【参考方案1】:

IMO 您这样做的顺序错误。我强烈建议您直接在 pyspark 应用程序中使用来自 Kafka 的数据。 如果您愿意,您也可以将 Kafka 主题写入 HDFS (请记住,Kafka 会保留数据,因此当您在 pyspark 中读取它时,不会更改从同一主题写入 HDFS 的内容)。

当数据已经在 Kafka 中时,将 PySpark 耦合到 HDFS 没有意义。

这是一个 simple example 直接在 pyspark 中消费来自 Kafka 的数据。

【讨论】:

感谢您的建议。我肯定会考虑改变我们的应用程序架构。但是我的问题有点不同。【参考方案2】:

我已经解决了这个问题。

如原帖下的 cmets 所述,我将数据保存为面向列且易于使用的 parquet 文件格式。

【讨论】:

以上是关于如何使用 pyspark 读取 hdfs kafka 数据?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:使用 configParser 读取 HDFS 上的属性文件

从 hdfs 读取文件 - pyspark

使用 pyspark 从 hdfs 读取文件时连接被拒绝

PySpark 无法从 hdfs 读取 csv:HiveExternalCatalog 错误

如何创建 Pyspark 应用程序

pyspark 中 读取 hive 表,提示 hdfs 中的 nameservice 不识别