Pyspark - FileInputDStream:查找新文件时出错

Posted

技术标签:

【中文标题】Pyspark - FileInputDStream:查找新文件时出错【英文标题】:Pyspark - FileInputDStream: Error finding new files 【发布时间】:2016-12-27 02:41:28 【问题描述】:

您好,我是 Python Spark 的新手,我正在从 Spark github 尝试这个示例,以便计算在给定目录中创建的新文本文件中的单词数:

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 2:
    print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
    exit(-1)

sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.textFileStream("hdfs:///home/my-logs/")
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda x: (x, 1))\
              .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

这就是我得到的: a warning saying : WARN FileInputDStream: Error finding new files

一条警告消息说:WARN FileInputDStream: Error finding new files

即使我在此目录中添加文件,结果也是空的:/

对此有什么建议的解决方案吗? 谢谢。

【问题讨论】:

【参考方案1】:

问题是火花流不会从目录中读取旧文件..因为所有日志文件都存在于您的流作业开始之前 因此,一旦您开始流式传输作业,然后手动或通过脚本将输入文件放入/复制到 hdfs 目录中,您需要做什么。

【讨论】:

【参考方案2】:

我认为您指的是this 示例。您是否可以在不修改的情况下运行它,因为我看到您在程序中将目录设置为“hdfs:///”?您可以运行如下示例。

例如,Spark 位于 /opt/spark-2.0.2-bin-hadoop2.7。您可以在示例目录中运行hdfs_wordcount.py,如下所示。我们使用/tmp 作为目录作为参数传递给程序。

user1@user1:/opt/spark-2.0.2-bin-hadoop2.7$ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py /tmp

现在当这个程序运行时,打开另一个终端并将一些文件复制到/tmp文件夹

user1@user1:~$ cp test.txt /tmp

你会在第一个终端看到字数。

【讨论】:

嗨@abaghel,这只是打字时的错误(我已经更新了我的问题),我使用的是完全相同的示例,但我一直得到空结果,你呢认为我应该 spark-2.x.x-bin-hadoop2.7 而不是从 github 或其他什么地方拉它? 您可以尝试先从github.com/apache/spark/blob/master/examples/src/main/python/… 运行示例,而不是在程序中设置自定义目录路径。流处理的结果将在其他终端快速滚动,因此您必须在 /tmp 目录中添加文件后立即观察它。 它现在对我有用,只改变了一些构建参数:D,其他问题,你认为我可以做一些事情来激发流,即使在文件更改(附加在相同的日志上)? 【参考方案3】:

解决了!

问题是构建,我使用 maven 来构建,这取决于他们来自 github 的自述文件:

build/mvn -DskipTests clean package

我根据他们的documentation 构建了这种方式:

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

有人知道这些参数是什么吗?

【讨论】:

以上是关于Pyspark - FileInputDStream:查找新文件时出错的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 安装错误:没有名为“pyspark”的模块

Pyspark:将 sql 查询转换为 pyspark?

Pyspark - ImportError:无法从“pyspark”导入名称“SparkContext”

Pyspark:基于所有列减去/差异 pyspark 数据帧

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

pyspark:在日期和时间上重新采样 pyspark 数据帧