使用 SparkSQL 读取 Impala 表
Posted
技术标签:
【中文标题】使用 SparkSQL 读取 Impala 表【英文标题】:Read Impala table with SparkSQL 【发布时间】:2017-08-28 19:47:05 【问题描述】:我试图执行一个查询,该查询具有lead .. over .. partition 和 Union 之类的功能。当我尝试在 impala 上运行它但在 Hive 上失败时,此查询运行良好。
我需要编写一个执行此查询的 Spark 作业。它在 SparkSQL 中也失败了,我的假设是因为 Spark 1.6 内部使用 HiveQL 来完成上述任务。
是否有一些不同的方法可以从 SparkSQL 读取 impala 表?因为在 Hive 中工作的基本查询和在 SprkSQL 中都能正常工作。
FYR 我正在尝试运行的查询:
SELECT issue_id,
CASE WHEN COALESCE(lead(created, 1) OVER (PARTITION BY issue_id ORDER BY created ASC,
field_sequence ASC), '') = '' THEN 'to' ELSE LEAD('from', 1) OVER (PARTITION BY issue_id ORDER BY created ASC, field_sequence ASC) END Status,
created StartDate,
LEAD(created, 1) OVER (PARTITION BY issue_id ORDER BY created ASC, field_sequence ASC) EndDate
FROM (
SELECT issue_id, created, field, 'from', 'to', field_sequence FROM tab1 WHERE COALESCE(LOWER(field), '') = 'status'
UNION
SELECT issue_id, updated_date created, '' field, '' 'from', '' 'to', 0 field_sequence FROM tab2
) hc WHERE hc.issue_id = '123'
和错误信息:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o83.sql.
: java.lang.RuntimeException: [1.55] failure: ``)'' expected but identifier OVER found
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
你得到什么错误? 在 Pyspark 中:失败:“)”预期但发现标识符 OVER 当我在 Hive 中执行它时,它说我需要使用关键字 ALL。所以当我这样做时,它给了我与以下相关的错误:编译语句时出错:FAILED:RuntimeException java.lang.ClassNotFoundException:org.apache.kudu.mapreduce.KuduTableInputFormat 你能给我一个可重现的例子吗(只需打印出 tab1 的第一行) 【参考方案1】:定义Status
时缺少AS
,最后一个select 语句中缺少几个逗号。此外,coalesce
没有用,您可以使用IF ELSE
,因为只有一种情况。
你应该分解你的计算,这样你就没有嵌套的select
子句,它们效率低下。
【讨论】:
我没有得到关于 AS 的第一条评论。您可以在响应中添加您建议的查询吗?这会有所帮助.. 你能给我一个可重现的例子吗(只打印出tab1
的第一行)以上是关于使用 SparkSQL 读取 Impala 表的主要内容,如果未能解决你的问题,请参考以下文章
使用 SparkSQL 和 HiveContext 读取 Parquet 文件时出错
Impala vs SparkSQL:内置函数翻译:fnv_hash
SparkSQL使用Python从MySQL数据库表中读取[重复]