从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类

Posted

技术标签:

【中文标题】从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类【英文标题】:Read an HDFS File from a HIVE UDF - Execution Error, return code 101 FunctionTask. Could not initialize class 【发布时间】:2014-12-10 13:38:43 【问题描述】:

我们一直在尝试创建一个简单的 Hive UDF 来屏蔽 Hive 表中的某些字段。我们正在使用一个外部文件(放置在 HDFS 上)来抓取一段文本,以对掩蔽过程进行加盐处理。看起来我们一切正常,但是当我们尝试创建外部函数时,它会抛出错误:

org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. Could not initialize class co.company.Mask

这是我们的 UDF 代码:

package co.company;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.apache.commons.codec.digest.DigestUtils;

@Description( 
        name = "masker",
        value = "_FUNC_(str) - mask a string",      
        extended = "Example: \n" +
                " SELECT masker(column) FROM hive_table; "      
        )
public class Mask extends UDF  

    private static final String arch_clave = "/user/username/filename.dat";
    private static String clave = null; 

    public static String getFirstLine( String arch ) 

        try 
            FileSystem fs = FileSystem.get(new Configuration());
            FSDataInputStream in = fs.open(new Path(arch));
            BufferedReader br = new BufferedReader(new InputStreamReader(in));    

            String ret = br.readLine();
            br.close();
            return ret;

         catch (Exception e)  

        System.out.println("out: Error Message: " + arch + " exc: " + e.getMessage());
        return null;
     


public Text evaluate(Text s) 

    clave = getFirstLine( arch_clave );

    Text to_value = new Text( DigestUtils.shaHex( s + clave) );
    return to_value;


我们正在通过 HUE 的界面上传 jar 文件并创建 UDF(遗憾的是,我们还没有控制台访问 Hadoop 集群。

在 Hue 的 Hive 界面上,我们的命令是:

add jar hdfs:///user/my_username/myJar.jar

然后创建我们执行的函数:

CREATE TEMPORARY FUNCTION masker as 'co.company.Mask';

遗憾的是,当我们尝试创建 UDF 时抛出的错误并不是很有帮助。这是创建 UDF 的日志。任何帮助是极大的赞赏。非常感谢。

14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO parse.ParseDriver: Parsing command: CREATE TEMPORARY FUNCTION enmascarar as 'co.bancolombia.analitica.Enmascarar'
14/12/10 08:32:15 INFO parse.ParseDriver: Parse Completed
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=parse start=1418218335753 end=1418218335754 duration=1 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO parse.FunctionSemanticAnalyzer: analyze done
14/12/10 08:32:15 INFO ql.Driver: Semantic Analysis Completed
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=semanticAnalyze start=1418218335754 end=1418218335757 duration=3 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=compile start=1418218335753 end=1418218335757 duration=4 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=acquireReadWriteLocks from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO lockmgr.DummyTxnManager: Creating lock manager of type org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager
14/12/10 08:32:15 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=server1.domain:2181,server2.domain.corp:2181,server3.domain:2181 sessionTimeout=600000 watcher=org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager$DummyWatcher@2ebe4e81
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=acquireReadWriteLocks start=1418218335760 end=1418218335797 duration=37 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO ql.Driver: Starting command: CREATE TEMPORARY FUNCTION enmascarar as 'co.company.Mask'
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=TimeToSubmit start=1418218335760 end=1418218335798 duration=38 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=runTasks from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=task.FUNCTION.Stage-0 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 ERROR ql.Driver: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. Could not initialize class co.company.MasK
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=Driver.execute start=1418218335797 end=1418218335800 duration=3 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO log.PerfLogger: <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 INFO ZooKeeperHiveLockManager:  about to release lock for default
14/12/10 08:32:15 INFO ZooKeeperHiveLockManager:  about to release lock for colaboradores
14/12/10 08:32:15 INFO log.PerfLogger: </PERFLOG method=releaseLocks start=1418218335800 end=1418218335822 duration=22 from=org.apache.hadoop.hive.ql.Driver>
14/12/10 08:32:15 ERROR operation.Operation: Error running hive query: 
org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. Could not initialize class co.company.Mask
	at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:147)
	at org.apache.hive.service.cli.operation.SQLOperation.access$000(SQLOperation.java:69)
	at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:200)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
	at org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:502)
	at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:213)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)

【问题讨论】:

【参考方案1】:

此问题已解决,但与代码无关。上面的代码可以很好地从 HIVE UDF 中读取 HDFS 中的文件(非常低效,因为每次调用评估函数时它都会读取文件,但它会设法读取文件)。

原来,当通过 HUE 创建 Hive UDF 时,你上传了 jar,然后创建了函数。但是,如果您更改了函数并重新上传了 jar,它仍然保持之前的函数定义。

我们在 jar 中的另一个包中定义了相同的 UDF 类,在 HIVE 中删除了原始函数并通过 HUE 再次创建了该函数(使用新类):

add jar hdfs:///user/my_username/myJar2.jar;
drop function if exists masker;
create temporary function masker as 'co.company.otherpackage.Mask';

似乎需要为 HIVE(或 HUE?、Thrift?)提交错误报告,我仍然需要更好地了解系统的哪个部分出了问题。

我希望它对未来的人有所帮助。

【讨论】:

嗨 Rafael,你有没有找到一种更有效的方法,不必每次都加载文件? @JochenDB 实际上,我们认为静态加载变量可以解决问题。我会在静态块上做。像这样的东西: public class Mask extends UDF staticclave = getFirstLine(arch_clave); @JochenDB 您可以使用GenericUDF initialize() 来初始化变量一次并在evaluate() 方法中重复使用多次。【参考方案2】:

这不起作用,因为默认情况下 new Configuration() 将使用 core-default.xmlcore-site.xml 进行初始化,请参阅来源。

同时,您可能(并且应该)拥有 hdfs-site.xml 等。

不幸的是,我没有找到在 HiveUDF 上进行配置的可靠方法,原因很长。

一般来说,恕我直言,您必须逐个使用下一种方法:

    public void configure(MapredContext context) 在您的 UDF 上,但是由于矢量化和/或使用非 MR 引擎或本地执行的缺陷,它可能不会被调用(...限制 5 将触发问题)等。 SessionState.get().getConf() 如果 SessionState.get() 不为空 初始化配置并添加比默认更多的资源(请参阅配置源中的列表)

    使用 RHive 方法并从 Hadoop 配置 (FSUtils.java) 加载所有 .xml

    public static Configuration getConf() throws IOException
    
    if(conf != null) return conf;
    
    conf = new Configuration();
    
    String hadoopConfPath = System.getProperty("HADOOP_CONF_DIR");
    
    if (StringUtils.isNotEmpty(hadoopConfPath)) 
    
        File dir = new File(hadoopConfPath);
        if (!dir.exists() || !dir.isDirectory()) 
            return conf;
        
    
        File[] files = dir.listFiles(
                new FilenameFilter() 
                    public boolean accept(File dir, String name) 
                        return name.endsWith("xml");
                    
                
        );
    
        for (File file : files) 
            try 
                URL url = new URL("file://" + file.getCanonicalPath());
                conf.addResource(url);
             catch (Exception e) 
            
        
    
    
    return conf;
    

那么,这里是完整的解决方案

在 UDF 中添加设置器

public abstract class BasicUDF extends GenericUDF implements Configurable 

/**
 * Invocation context
 */
private MapredContext mapReduceContext = null;

/**
 * Hadoop Configuration
 */
private Configuration hadoopConfiguration = null;

/**
 Save MR context, if arrived
  */
@Override
public void configure(MapredContext context) 
    if (context != null) 
        this.mapReduceContext = context;
        this.propertyReader.addHiveConfigurationSource(context);
        this.resourceFinder.addHiveJobConfiguration(context.getJobConf());
        log.debug("Non-empty MapredContext arrived");
     else 
        log.error("Empty MapredContext arrived");
    


/**
 * Save Hadoop configuration, if arrived
  */
@Override
public void setConf(Configuration conf) 
    this.hadoopConfiguration = conf;
    this.propertyReader.addHadoopConfigurationSource(conf);
    this.resourceFinder.addHadoopConfigurationSource(conf);

然后,你需要配置的地方

    public Configuration findConfiguration() 
    if (hiveJobConfiguration != null) 
        log.debug("Starting with hiveJobConfiguration");
        return hiveJobConfiguration;
    

    if (SessionState.get() != null && SessionState.get().getConf() != null) 
        log.debug("Starting with SessionState configuration");
        return SessionState.get().getConf();
    

    if (hadoopConfiguration != null) 
        log.debug("Starting with hadoopConfiguration");
        return hadoopConfiguration;
    

    log.debug("No existing configuration found, falling back to manually initialized");
    return createNewConfiguration();


private Configuration createNewConfiguration() 

    // load defaults, "core-default.xml" and "core-site.xml"
    Configuration configuration = new Configuration();

    // load expected configuration, mapred-site.xml, mapred-default.xml, hdfs-site.xml hdfs-default.xml
    configuration.addResource("mapred-default.xml");
    configuration.addResource("mapred-site.xml");

    configuration.addResource("hdfs-default.xml");
    configuration.addResource("hdfs-site.xml");

    // load Hadoop configuration from FS if any and if requested
    if (fallbackReadHadoopFilesFromFS) 
        log.debug("Configured manual read of Hadoop configuration from FS");
        try 
            addFSHadoopConfiguration(configuration);
         catch (RuntimeException re) 
            log.error("Reading of Hadoop configuration from FS failed", re);
        
    

    return configuration;


@edu.umd.cs.findbugs.annotations.SuppressFBWarnings(
        value = "REC_CATCH_EXCEPTION", "SIC_INNER_SHOULD_BE_STATIC_ANON",
        justification = "Findbugs bug, missed IOException from file.getCanonicalPath(); DOn't like idea with static anon"
)
private void addFSHadoopConfiguration(Configuration configuration) 

    log.debug("Started addFSHadoopConfiguration to load configuration from FS");

    String hadoopConfPath = System.getProperty("HADOOP_CONF_DIR");

    if (StringUtils.isEmpty(hadoopConfPath)) 
        log.error("HADOOP_CONF_DIR is not set, skipping FS load in addFSHadoopConfiguration");
        return;
     else 
        log.debug("Found configuration dir, it points to " + hadoopConfPath);
    

    File dir = new File(hadoopConfPath);

    if (!dir.exists() || !dir.isDirectory()) 
        log.error("HADOOP_CONF_DIR points to invalid place " + hadoopConfPath);
        return;
    

    File[] files = dir.listFiles(
            new FilenameFilter() 
                public boolean accept(File dir, String name) 
                    return name.endsWith("xml");
                
            
    );

    if (files == null) 
        log.error("Configuration dir does not denote a directory, or if an I/O error occured. Dir used " + hadoopConfPath);
        return;
    

    for (File file : files) 
        try 
            URL url = new URL("file://" + file.getCanonicalPath());
            configuration.addResource(url);
         catch (Exception e) 
            log.error("Failed to open configuration file " + file.getPath(), e);
        
    


像魅力一样工作

【讨论】:

以上是关于从 HIVE UDF 读取 HDFS 文件 - 执行错误,返回代码 101 FunctionTask。无法初始化类的主要内容,如果未能解决你的问题,请参考以下文章

使用java读取存储在hdfs中的excel文件

Hive添加的文件在哪里?

即使从 hdfs 删除后,Hive 如何读取数据?

在spark udf中读取hdfs上的文件

Pig UDF 将文件写入 HDFS

IDEA下写hive的udf(踩坑教程)