使用 Spark 读取文本文件并在 spark sql 上插入值

Posted

技术标签:

【中文标题】使用 Spark 读取文本文件并在 spark sql 上插入值【英文标题】:Reading a text file using Spark and inserting the value on spark sql 【发布时间】:2020-05-04 10:44:46 【问题描述】:
from pyspark import SparkContext
from pyspark import SparkConf

lines = sc.textFile("s3://test_bucket/txt/testing_consol.txt")

llist = lines.collect()

for lines in llist:
        final_query = spark.sql("""0
        """.format(lines))

这是txt文件里面的内容:

select * from test_table 
where id=1

我收到错误消息:

"\n 匹配的输入 'where' 期望 '(', 'SELECT', 'FROM', 'ADD', 'DESC'、'WITH'、'VALUES'、'CREATE'、'TABLE'、'INSERT'、'DELETE'、 'DESCRIBE'、'EXPLAIN'、'SHOW'、'USE'、'DROP'、'ALTER'、'MAP'、'SET'、 'RESET'、'START'、'COMMIT'、'ROLLBACK'、'REDUCE'、'REFRESH'、'CLEAR'、 '缓存','UNCACHE','DFS','截断','分析','列表','撤销', 'GRANT'、'LOCK'、'UNLOCK'、'MSCK'、'EXPORT'、'IMPORT'、'LOAD'(第 1 行, pos 0)\n\n== SQL ==\n其中 id=1\n^^^\n"

如果我将 txt 文件的内容更改为单行,则 spark sql 可以工作:

select * from test_table where id=1

好像spark sql只能识别第一行,不能识别后续行。

【问题讨论】:

【参考方案1】:

如果您只是合并查询行,它应该可以工作:

llist = ' '.join(lines.collect())
final_query = spark.sql(llist)

【讨论】:

我收到错误消息:未定义名称'sparkSession'。 我曾尝试使用 spark.sql 而不是 sparkSession.sql,现在它工作正常,就像将所有行加入一行一样。所以如果是这种情况,那么我们不能将 cmets 添加到脚本中,因为它会导致 sql 失败。我能够运行更复杂的查询,但我需要删除 cmets 才能这样做。这现在很好用,谢谢【参考方案2】:

只是为了阅读查询,不能选择创建 rdd。 您应该使用 --files 参数将包含查询的文本文件传递给 spark 驱动程序 然后使用 python open 命令读取文件并将查询传递给 spark sql。

完全不建议使用 spark 读取小文件来传递查询。

【讨论】:

以上是关于使用 Spark 读取文本文件并在 spark sql 上插入值的主要内容,如果未能解决你的问题,请参考以下文章

Spark:读取文本文件后的重新分区策略

Spark基础编程学习03

Spark学习笔记——数据读取和保存

在 spark java 中读取具有固定宽度和分隔符的文本文件

在 Spark 中读取具有多个标题的文本文件

Spark学习笔记4:数据读取与保存