如何在spark中使用transform python udf执行hql脚本?

Posted

技术标签:

【中文标题】如何在spark中使用transform python udf执行hql脚本?【英文标题】:How to execute hql script with transform python udf in spark? 【发布时间】:2017-09-14 07:27:57 【问题描述】:

我是通过 POC 激发和学习的新手。作为此 POC 的一部分,我正在尝试直接执行具有 transform 关键字的 hql 文件以使用 python udf。

我已经在 CLI "hive -f filename.hql" 中测试了 hql 脚本,它工作正常。 我在 spark-sql 中尝试过的相同脚本,但由于找不到 hdfs path not found 错误而失败。我尝试以不同的方式提供 hdfs 路径,如下所示,但都不起作用

"/test/scripts/test.hql"

"hdfs://test.net:8020/test/scripts/test.hql"

"hdfs:///test.net:8020/test/scripts/test.hql"

还尝试在 hive 转换代码中给出完整路径,如下所示

USING "scl enable python27 'python hdfs://test.net:8020/user/test/scripts/TestPython.py'" 

Hive 代码

add file hdfs://test.net:8020/user/test/scripts/TestPython.py;


select * from 
    (select transform (*)
    USING "scl enable python27 'python TestPython.py'" 
    as (Col_1     STRING,
    col_2        STRING,
    ...
    ..
    col_125 STRING
    )
    FROM
    test.transform_inner_temp1 a) b;

TestPython 代码:

#!/usr/bin/env python
'''
Created on June 2, 2017

@author: test
'''
import sys
from datetime import datetime
import decimal
import string
D = decimal.Decimal
for line in sys.stdin:
    line = sys.stdin.readline()   
    TempList = line.strip().split('\t')
    col_1 = TempList[0]
    ... 
    ....
    col_125 = TempList[34] + TempList[32]
    outList.extend((col_1,....col_125))
    outValue = "\t".join(map(str,outList))
    print "%s"%(outValue)

所以我尝试了另一种方法,直接在 spark-submit 中执行

spark-submit --master yarn-cluster  hdfs://test.net:8020/user/test/scripts/testspark.py

testspark.py

from pyspark.sql.types import StringType
from pyspark import SparkConf, SparkContext
from pyspark import SQLContext
conf = SparkConf().setAppName("gveeran pyspark test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
with open("hdfs://test.net:8020/user/test/scripts/test.hql") as fr:
   query = fr.read()
results = sqlContext.sql(query)
results.show()

但同样的问题如下

Traceback (most recent call last):
  File "PySparkTest2.py", line 7, in <module>
    with open("hdfs://test.net:8020/user/test/scripts/test.hql") as fr:
IOError: [Errno 2] No such file or directory: 'hdfs://test.net:8020/user/test/scripts/test.hql'

【问题讨论】:

您是否检查了您要读取的文件的权限? 是的。我已经使用以下命令修改了权限“hdfs dfs -chmod -R 777 /user/test/scripts/” 您是否确保 HADOOP_CONF_DIR 指向包含 Hadoop 集群配置文件的目录(HDFS 位置)?尝试在您的 spark-submit 之前导出此环境变量:export HADOOP_CONF_DIR= 【参考方案1】:

您可以将文件作为查询读取,然后作为spark sql 作业执行

例子:-

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
sc =SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)
with open("/home/hadoop/test/abc.hql") as fr:
    query = fr.read()
    print(query)
    results = sqlCtx.sql(query)

【讨论】:

以上是关于如何在spark中使用transform python udf执行hql脚本?的主要内容,如果未能解决你的问题,请参考以下文章

Spark ML Transformer - 使用 rangeBetween 在窗口上聚合

Spark常用的transformation算子

Spark篇---Spark中Transformations转换算子

Spark常用的Transformation算子的简单例子

如何在 MLlib 中编写自定义 Transformer?

spark记录spark算子之Transformation