通过 hiveContext 在 Spark Job 中使用 Hive 函数
Posted
技术标签:
【中文标题】通过 hiveContext 在 Spark Job 中使用 Hive 函数【英文标题】:Using Hive functions in Spark Job via hiveContext 【发布时间】:2016-04-06 03:47:11 【问题描述】:我正在使用 Hive 1.2 和 Spark 1.4.1。以下查询通过 Hive CLI 运行良好:
hive> select row_number() over (partition by one.id order by two.id) as sk,
two.id, two.name, one.name, current_date()
from avant_source.one one
inner join avant_source.two two
on one.id = two.one_id;
但是当我尝试在 pyspark 作业中通过 HiveContext 使用它时,它给了我一个错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql.
: java.lang.RuntimeException: Couldn't find function current_date
代码sn-p:
from pyspark import HiveContext
conf = SparkConf().setAppName('DFtest')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df = sqlContext.sql("select row_number() over (partition by one.id order by two.id) as sk, two.id, two.name, one.name, current_date() from avant_source.one one inner join avant_source.two two on one.id = two.one_id")
df.show()
sc.stop()
有没有办法在 pyspark 中获取当前日期或时间戳?我尝试导入日期、日期时间,但总是抛出一个错误,提示找不到函数。
我尝试在 pyspark 1.5 沙盒的数据框中使用 current_date,但随后我也得到了不同的错误。
df = sqlContext.createDataFrame([(current_date,)],[‘d’])
df.select(date_sub(df.d,1).alias('d')).collect()
错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/mapr/spark/spark-1.5.2/python/pyspark/sql/dataframe.py", line 769, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/opt/mapr/spark/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/opt/mapr/spark/spark-1.5.2/python/pyspark/sql/utils.py", line 40, in deco
raise AnalysisException(s.split(': ', 1)[1])
pyspark.sql.utils.AnalysisException: cannot resolve 'datesub(d,1)' due to data type mismatch: argument 1 requires date type, however, 'd' is of struct<> type.;
请指教。
【问题讨论】:
你为什么要使用 F.current_date() ? 我尝试使用 from pyspark.sql 导入函数作为 F,因为简单的 current_date() 不起作用。然后它也报错了,但忘了把F.拿出来。 【参考方案1】:对于我的场景,我使用了以下
import datetime
now = datetime.datetime.now()
df = df.withColumn('eff_start', lit(now.strftime("%Y-%m-%d")))
对于 Hive 函数无法正确使用 HiveContext for HiveQL 的错误,这是一个集群问题,其中一个运行 HiveServer2 的节点由于内存问题而出现过多警报。那是造成问题的原因。它已在运行 Spark 1.5 和 Hive 1.2 的 MapR Sandbox 上成功测试
【讨论】:
以上是关于通过 hiveContext 在 Spark Job 中使用 Hive 函数的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark/Java Maven 项目获取 HiveContext
理解Spark SQL(二)—— SQLContext和HiveContext
在 spark 1.6 中计数(不同)不能与 hivecontext 查询一起使用