Pyspark(Dataframes)逐行读取文件(将行转换为字符串)
Posted
技术标签:
【中文标题】Pyspark(Dataframes)逐行读取文件(将行转换为字符串)【英文标题】:Pyspark (Dataframes) read file line wise (Convert row to string) 【发布时间】:2018-08-27 23:01:25 【问题描述】:我需要逐行读取文件并将每一行拆分为单词并对单词执行操作。
我该怎么做?
我写了以下代码:
logFile = "/home/hadoop/spark-2.3.1-bin-hadoop2.7/README.md" # Should be
some file on your system
spark = SparkSession.builder.appName("SimpleApp1").getOrCreate()
logData = spark.read.text(logFile).cache()
logData.printSchema()
logDataLines = logData.collect()
#The line variable below seems to be of type row. How I perform similar operations
on row or how do I convert row to a string.
for line in logDataLines:
words = line.select(explode(split(line,"\s+")))
for word in words:
print(word)
print("----------------------------------")
【问题讨论】:
通过使用collect()
,您将收集驱动程序节点上的所有数据,即如果您这样做,则无需使用Spark。这个问题展示了如何拆分数据框中的列并将其分解:***.com/questions/38210507/explode-in-pyspark
【参考方案1】:
我认为您应该将map
函数应用于您的行。
您可以在自创函数中应用任何内容:
data = spark.read.text("/home/spark/test_it.txt").cache()
def someFunction(row):
wordlist = row[0].split(" ")
result = list()
for word in wordlist:
result.append(word.upper())
return result
data.rdd.map(someFunction).collect()
输出:
[[u'THIS', u'IS', u'JUST', u'A', u'TEST'], [u'TO', u'UNDERSTAND'], [u'THE', u'PROCESSING']]
【讨论】:
以上是关于Pyspark(Dataframes)逐行读取文件(将行转换为字符串)的主要内容,如果未能解决你的问题,请参考以下文章
通过 DataFrames 从配置单元视图与配置单元表读取时的性能考虑
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe
如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引
Pyspark DataFrames 中的嵌套 SELECT 查询