使用 Spark 读取文本文件并在 spark sql 上插入值
Posted
技术标签:
【中文标题】使用 Spark 读取文本文件并在 spark sql 上插入值【英文标题】:Reading a text file using Spark and inserting the value on spark sql 【发布时间】:2020-05-04 10:44:46 【问题描述】:from pyspark import SparkContext
from pyspark import SparkConf
lines = sc.textFile("s3://test_bucket/txt/testing_consol.txt")
llist = lines.collect()
for lines in llist:
final_query = spark.sql("""0
""".format(lines))
这是txt文件里面的内容:
select * from test_table
where id=1
我收到错误消息:
"\n 匹配的输入 'where' 期望 '(', 'SELECT', 'FROM', 'ADD', 'DESC'、'WITH'、'VALUES'、'CREATE'、'TABLE'、'INSERT'、'DELETE'、 'DESCRIBE'、'EXPLAIN'、'SHOW'、'USE'、'DROP'、'ALTER'、'MAP'、'SET'、 'RESET'、'START'、'COMMIT'、'ROLLBACK'、'REDUCE'、'REFRESH'、'CLEAR'、 '缓存','UNCACHE','DFS','截断','分析','列表','撤销', 'GRANT'、'LOCK'、'UNLOCK'、'MSCK'、'EXPORT'、'IMPORT'、'LOAD'(第 1 行, pos 0)\n\n== SQL ==\n其中 id=1\n^^^\n"
如果我将 txt 文件的内容更改为单行,则 spark sql 可以工作:
select * from test_table where id=1
好像spark sql只能识别第一行,不能识别后续行。
【问题讨论】:
【参考方案1】:如果您只是合并查询行,它应该可以工作:
llist = ' '.join(lines.collect())
final_query = spark.sql(llist)
【讨论】:
我收到错误消息:未定义名称'sparkSession'。 我曾尝试使用 spark.sql 而不是 sparkSession.sql,现在它工作正常,就像将所有行加入一行一样。所以如果是这种情况,那么我们不能将 cmets 添加到脚本中,因为它会导致 sql 失败。我能够运行更复杂的查询,但我需要删除 cmets 才能这样做。这现在很好用,谢谢【参考方案2】:只是为了阅读查询,不能选择创建 rdd。 您应该使用 --files 参数将包含查询的文本文件传递给 spark 驱动程序 然后使用 python open 命令读取文件并将查询传递给 spark sql。
完全不建议使用 spark 读取小文件来传递查询。
【讨论】:
以上是关于使用 Spark 读取文本文件并在 spark sql 上插入值的主要内容,如果未能解决你的问题,请参考以下文章