Hive 查询 executeQuery() 在 java JDBC 代码中挂起

Posted

技术标签:

【中文标题】Hive 查询 executeQuery() 在 java JDBC 代码中挂起【英文标题】:Hive query executeQuery() hangs in java JDBC code 【发布时间】:2019-07-12 07:44:03 【问题描述】:

我创建了一个 UDTF,并在其中运行 java hive JDBC 代码以执行 hive 查询并获取结果。 我能够成功连接到 hive2 服务器,但代码在statement.executeQuery() 处无限期挂起,没有任何异常。可能是什么原因?相同的代码在独立的 eclipse 类中运行,但在作为 udtf 部署在 hadoop 集群中时手。


public class DynamicWhereUDTF extends GenericUDTF 
    private PrimitiveObjectInspector stringOI = null;
    ArrayList<Object[]> results = new ArrayList<Object[]>();

    @Override
    public StructObjectInspector initialize(ObjectInspector[] args)
            throws UDFArgumentException 

        stringOI = (PrimitiveObjectInspector) args[0];
        if (stringOI != null) 
            String name = stringOI.toString();
            System.out.println("param <-------> " + name);
        

        List<String> fieldNames = new ArrayList<String>();
        try 
            fieldNames = getColumnNames("d_drug");
         catch (SQLException e) 
            e.printStackTrace();
        
        System.out.println("fieldNames size ---> " + fieldNames.size());
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        for (int i = 0; i < fieldNames.size(); i++) 
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        

        System.out.println("----------ObjectInspectorFactory created------------ ");
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement statement = null;
        try 
            System.out.println("Processing records 1");
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            System.out.println("Processing records 2");
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "Kerberos");
            conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
            conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab("abc@CS.MSD", "/tmp/abc.keytab");
            System.out.println("Processing records 3");
            String hiveJdbcUrl = "jdbc:hive2://<host>:10000/demo_db;principal=hive/<host>@CS.MSD";
            conn = DriverManager.getConnection(hiveJdbcUrl, "abc", "");
            System.out.println("conn1 <-------> " + conn);
            statement = conn.prepareStatement("select * from xyz limit 5");
            System.out.println(" statement ----------> " + statement);
            rs = statement.executeQuery();
            System.out.println(" resultset ----------> " + rs);
            ResultSetMetaData rsMetaData = rs.getMetaData();
            int columnCount = rsMetaData.getColumnCount();
            System.out.println("columnCount ---> " + columnCount);
            // ArrayList<Object[]> results = new ArrayList<Object[]>();
            StringBuilder values = new StringBuilder();

            while (rs.next()) 
                values = new StringBuilder();
                for (int i = 0; i < columnCount; i++) 
                    values = values.append(rs.getString(i + 1)).append(",");
                
                String output = values.toString().substring(0,
                        values.lastIndexOf(","));
                System.out.println("output  -----> " + output);
                results.add(new Object[] "122556", "52905");
            
            System.out.println("------- results forwarded -------");

         catch (Exception ex) 
            ex.printStackTrace();
         finally 
            if (conn != null)
                try 
                    conn.close();
                 catch (SQLException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
        
        return ObjectInspectorFactory.getStandardStructObjectInspector(
                fieldNames, fieldOIs);
    

    @Override
    public void close() throws HiveException 
        // TODO Auto-generated method stub

    

    @Override
    public void process(Object[] record) throws HiveException 

        try 
            Iterator<Object[]> it = results.iterator();
            while (it.hasNext()) 
                Object[] r = it.next();
                forward(r);
            
            System.out.println("------- results forwarded -------");
         catch (Exception ex) 
            ex.printStackTrace();
        
    

    public List<String> getColumnNames(String tableName) throws SQLException 
        List<String> fieldNames = new ArrayList<String>();
        fieldNames.add("drug_id");
        fieldNames.add("drug_cd");
        return fieldNames;
    


【问题讨论】:

【参考方案1】:

问题可能是在initialize 方法中创建连接。尝试用configure方法创建连接,可以以Hbase connector为例。

【讨论】:

我能够成功创建连接,但它的 executeQuery() 会无限期卡住。 UDTF 函数在从 hive CLI 运行时执行。但不是来自 hiverserver2,通过色调。看起来像一些哨兵级别的权限问题。一旦我们找到解决方案,我将在此处发布解决方案。

以上是关于Hive 查询 executeQuery() 在 java JDBC 代码中挂起的主要内容,如果未能解决你的问题,请参考以下文章

Hive,Hive on Spark和SparkSQL区别

如何使用 Hive 仓库连接器在 pyspark 中执行 HQL 文件

如何在 Swift FMDB 中使用 executeQuery() 执行基于条件的查询

Statement.executeQuery 为每个 SQL 查询花费相同的时间

GORM 的这个实现目前不支持基于字符串的查询,如 [executeQuery]

00312_预处理对象executeQuery方法(实现数据库的查询)