从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类
Posted
技术标签:
【中文标题】从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类【英文标题】:Read an HDFS File from a HIVE UDF - Execution Error, return code 101 FunctionTask. Could not initialize class 【发布时间】:2014-12-10 13:38:43 【问题描述】:我们一直在尝试创建一个简单的 Hive UDF 来屏蔽 Hive 表中的某些字段。我们正在使用一个外部文件(放置在 HDFS 上)来抓取一段文本,以对掩蔽过程进行加盐处理。看起来我们一切正常,但是当我们尝试创建外部函数时,它会抛出错误:
org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. Could not initialize class co.company.Mask
这是我们的 UDF 代码:
package co.company;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.commons.codec.digest.DigestUtils;
@Description(
name = "masker",
value = "_FUNC_(str) - mask a string",
extended = "Example: \n" +
" SELECT masker(column) FROM hive_table; "
)
public class Mask extends UDF
private static final String arch_clave = "/user/username/filename.dat";
private static String clave = null;
public static String getFirstLine( String arch )
try
FileSystem fs = FileSystem.get(new Configuration());
FSDataInputStream in = fs.open(new Path(arch));
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String ret = br.readLine();
br.close();
return ret;
catch (Exception e)
System.out.println("out: Error Message: " + arch + " exc: " + e.getMessage());
return null;
public Text evaluate(Text s)
clave = getFirstLine( arch_clave );
Text to_value = new Text( DigestUtils.shaHex( s + clave) );
return to_value;
我们正在通过 HUE 的界面上传 jar 文件并创建 UDF(遗憾的是,我们还没有控制台访问 Hadoop 集群。
在 Hue 的 Hive 界面上,我们的命令是:
add jar hdfs:///user/my_username/myJar.jar
然后创建我们执行的函数:
CREATE TEMPORARY FUNCTION masker as 'co.company.Mask';
遗憾的是,当我们尝试创建 UDF 时抛出的错误并不是很有帮助。这是创建 UDF 的日志。任何帮助是极大的赞赏。非常感谢。
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO parse.ParseDriver: Parsing command: CREATE TEMPORARY FUNCTION enmascarar as 'co.bancolombia.analitica.Enmascarar'
14/12/10 08:32:15 INFO parse.ParseDriver: Parse Completed
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=parse start=1418218335753 end=1418218335754 duration=1 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO parse.FunctionSemanticAnalyzer: analyze done
14/12/10 08:32:15 INFO ql.Driver: Semantic Analysis Completed
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=semanticAnalyze start=1418218335754 end=1418218335757 duration=3 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=compile start=1418218335753 end=1418218335757 duration=4 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=acquireReadWriteLocks from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO lockmgr.DummyTxnManager: Creating lock manager of type org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager
14/12/10 08:32:15 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=server1.domain:2181,server2.domain.corp:2181,server3.domain:2181 sessionTimeout=600000 watcher=org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager$DummyWatcher@2ebe4e81
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=acquireReadWriteLocks start=1418218335760 end=1418218335797 duration=37 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO ql.Driver: Starting command: CREATE TEMPORARY FUNCTION enmascarar as 'co.company.Mask'
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=TimeToSubmit start=1418218335760 end=1418218335798 duration=38 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=runTasks from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=task.FUNCTION.Stage-0 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 ERROR ql.Driver: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. Could not initialize class co.company.MasK
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=Driver.execute start=1418218335797 end=1418218335800 duration=3 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO ZooKeeperHiveLockManager: about to release lock for default
14/12/10 08:32:15 INFO ZooKeeperHiveLockManager: about to release lock for colaboradores
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=releaseLocks start=1418218335800 end=1418218335822 duration=22 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 ERROR operation.Operation: Error running hive query:
org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. Could not initialize class co.company.Mask
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:147)
at org.apache.hive.service.cli.operation.SQLOperation.access$000(SQLOperation.java:69)
at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:502)
at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:213)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
【问题讨论】:
【参考方案1】:此问题已解决,但与代码无关。上面的代码可以很好地从 HIVE UDF 中读取 HDFS 中的文件(非常低效,因为每次调用评估函数时它都会读取文件,但它会设法读取文件)。
原来,当通过 HUE 创建 Hive UDF 时,你上传了 jar,然后创建了函数。但是,如果您更改了函数并重新上传了 jar,它仍然保持之前的函数定义。
我们在 jar 中的另一个包中定义了相同的 UDF 类,在 HIVE 中删除了原始函数并通过 HUE 再次创建了该函数(使用新类):
add jar hdfs:///user/my_username/myJar2.jar;
drop function if exists masker;
create temporary function masker as 'co.company.otherpackage.Mask';
似乎需要为 HIVE(或 HUE?、Thrift?)提交错误报告,我仍然需要更好地了解系统的哪个部分出了问题。
我希望它对未来的人有所帮助。
【讨论】:
嗨 Rafael,你有没有找到一种更有效的方法,不必每次都加载文件? @JochenDB 实际上,我们认为静态加载变量可以解决问题。我会在静态块上做。像这样的东西: public class Mask extends UDF staticclave = getFirstLine(arch_clave); @JochenDB 您可以使用GenericUDF initialize()
来初始化变量一次并在evaluate()
方法中重复使用多次。【参考方案2】:
这不起作用,因为默认情况下 new Configuration() 将使用 core-default.xml 和 core-site.xml 进行初始化,请参阅来源。
同时,您可能(并且应该)拥有 hdfs-site.xml 等。
不幸的是,我没有找到在 HiveUDF 上进行配置的可靠方法,原因很长。
一般来说,恕我直言,您必须逐个使用下一种方法:
-
public void configure(MapredContext context) 在您的 UDF 上,但是由于矢量化和/或使用非 MR 引擎或本地执行的缺陷,它可能不会被调用(...限制 5 将触发问题)等。
SessionState.get().getConf() 如果 SessionState.get() 不为空
初始化配置并添加比默认更多的资源(请参阅配置源中的列表)
使用 RHive 方法并从 Hadoop 配置 (FSUtils.java) 加载所有 .xml
public static Configuration getConf() throws IOException
if(conf != null) return conf;
conf = new Configuration();
String hadoopConfPath = System.getProperty("HADOOP_CONF_DIR");
if (StringUtils.isNotEmpty(hadoopConfPath))
File dir = new File(hadoopConfPath);
if (!dir.exists() || !dir.isDirectory())
return conf;
File[] files = dir.listFiles(
new FilenameFilter()
public boolean accept(File dir, String name)
return name.endsWith("xml");
);
for (File file : files)
try
URL url = new URL("file://" + file.getCanonicalPath());
conf.addResource(url);
catch (Exception e)
return conf;
那么,这里是完整的解决方案
在 UDF 中添加设置器
public abstract class BasicUDF extends GenericUDF implements Configurable
/**
* Invocation context
*/
private MapredContext mapReduceContext = null;
/**
* Hadoop Configuration
*/
private Configuration hadoopConfiguration = null;
/**
Save MR context, if arrived
*/
@Override
public void configure(MapredContext context)
if (context != null)
this.mapReduceContext = context;
this.propertyReader.addHiveConfigurationSource(context);
this.resourceFinder.addHiveJobConfiguration(context.getJobConf());
log.debug("Non-empty MapredContext arrived");
else
log.error("Empty MapredContext arrived");
/**
* Save Hadoop configuration, if arrived
*/
@Override
public void setConf(Configuration conf)
this.hadoopConfiguration = conf;
this.propertyReader.addHadoopConfigurationSource(conf);
this.resourceFinder.addHadoopConfigurationSource(conf);
然后,你需要配置的地方
public Configuration findConfiguration()
if (hiveJobConfiguration != null)
log.debug("Starting with hiveJobConfiguration");
return hiveJobConfiguration;
if (SessionState.get() != null && SessionState.get().getConf() != null)
log.debug("Starting with SessionState configuration");
return SessionState.get().getConf();
if (hadoopConfiguration != null)
log.debug("Starting with hadoopConfiguration");
return hadoopConfiguration;
log.debug("No existing configuration found, falling back to manually initialized");
return createNewConfiguration();
private Configuration createNewConfiguration()
// load defaults, "core-default.xml" and "core-site.xml"
Configuration configuration = new Configuration();
// load expected configuration, mapred-site.xml, mapred-default.xml, hdfs-site.xml hdfs-default.xml
configuration.addResource("mapred-default.xml");
configuration.addResource("mapred-site.xml");
configuration.addResource("hdfs-default.xml");
configuration.addResource("hdfs-site.xml");
// load Hadoop configuration from FS if any and if requested
if (fallbackReadHadoopFilesFromFS)
log.debug("Configured manual read of Hadoop configuration from FS");
try
addFSHadoopConfiguration(configuration);
catch (RuntimeException re)
log.error("Reading of Hadoop configuration from FS failed", re);
return configuration;
@edu.umd.cs.findbugs.annotations.SuppressFBWarnings(
value = "REC_CATCH_EXCEPTION", "SIC_INNER_SHOULD_BE_STATIC_ANON",
justification = "Findbugs bug, missed IOException from file.getCanonicalPath(); DOn't like idea with static anon"
)
private void addFSHadoopConfiguration(Configuration configuration)
log.debug("Started addFSHadoopConfiguration to load configuration from FS");
String hadoopConfPath = System.getProperty("HADOOP_CONF_DIR");
if (StringUtils.isEmpty(hadoopConfPath))
log.error("HADOOP_CONF_DIR is not set, skipping FS load in addFSHadoopConfiguration");
return;
else
log.debug("Found configuration dir, it points to " + hadoopConfPath);
File dir = new File(hadoopConfPath);
if (!dir.exists() || !dir.isDirectory())
log.error("HADOOP_CONF_DIR points to invalid place " + hadoopConfPath);
return;
File[] files = dir.listFiles(
new FilenameFilter()
public boolean accept(File dir, String name)
return name.endsWith("xml");
);
if (files == null)
log.error("Configuration dir does not denote a directory, or if an I/O error occured. Dir used " + hadoopConfPath);
return;
for (File file : files)
try
URL url = new URL("file://" + file.getCanonicalPath());
configuration.addResource(url);
catch (Exception e)
log.error("Failed to open configuration file " + file.getPath(), e);
像魅力一样工作
【讨论】:
以上是关于从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类的主要内容,如果未能解决你的问题,请参考以下文章