Pyspark - FileInputDStream:查找新文件时出错
Posted
技术标签:
【中文标题】Pyspark - FileInputDStream:查找新文件时出错【英文标题】:Pyspark - FileInputDStream: Error finding new files 【发布时间】:2016-12-27 02:41:28 【问题描述】:您好,我是 Python Spark 的新手,我正在从 Spark github 尝试这个示例,以便计算在给定目录中创建的新文本文件中的单词数:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("hdfs:///home/my-logs/")
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
这就是我得到的: a warning saying : WARN FileInputDStream: Error finding new files
一条警告消息说:WARN FileInputDStream: Error finding new files
。
即使我在此目录中添加文件,结果也是空的:/
对此有什么建议的解决方案吗? 谢谢。
【问题讨论】:
【参考方案1】:问题是火花流不会从目录中读取旧文件..因为所有日志文件都存在于您的流作业开始之前 因此,一旦您开始流式传输作业,然后手动或通过脚本将输入文件放入/复制到 hdfs 目录中,您需要做什么。
【讨论】:
【参考方案2】:我认为您指的是this 示例。您是否可以在不修改的情况下运行它,因为我看到您在程序中将目录设置为“hdfs:///”?您可以运行如下示例。
例如,Spark 位于 /opt/spark-2.0.2-bin-hadoop2.7
。您可以在示例目录中运行hdfs_wordcount.py
,如下所示。我们使用/tmp
作为目录作为参数传递给程序。
user1@user1:/opt/spark-2.0.2-bin-hadoop2.7$ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py /tmp
现在当这个程序运行时,打开另一个终端并将一些文件复制到/tmp
文件夹
user1@user1:~$ cp test.txt /tmp
你会在第一个终端看到字数。
【讨论】:
嗨@abaghel,这只是打字时的错误(我已经更新了我的问题),我使用的是完全相同的示例,但我一直得到空结果,你呢认为我应该 spark-2.x.x-bin-hadoop2.7 而不是从 github 或其他什么地方拉它? 您可以尝试先从github.com/apache/spark/blob/master/examples/src/main/python/… 运行示例,而不是在程序中设置自定义目录路径。流处理的结果将在其他终端快速滚动,因此您必须在 /tmp 目录中添加文件后立即观察它。 它现在对我有用,只改变了一些构建参数:D,其他问题,你认为我可以做一些事情来激发流,即使在文件更改(附加在相同的日志上)? 【参考方案3】:解决了!
问题是构建,我使用 maven 来构建,这取决于他们来自 github 的自述文件:
build/mvn -DskipTests clean package
我根据他们的documentation 构建了这种方式:
build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
有人知道这些参数是什么吗?
【讨论】:
以上是关于Pyspark - FileInputDStream:查找新文件时出错的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark - ImportError:无法从“pyspark”导入名称“SparkContext”
Pyspark:基于所有列减去/差异 pyspark 数据帧
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe