在 Spark 应用程序中使用 JDBC

Posted

技术标签:

【中文标题】在 Spark 应用程序中使用 JDBC【英文标题】:Using JDBC inside Spark application 【发布时间】:2019-02-19 13:53:53 【问题描述】:

我编写了一个用于批量加载 Phoenix 表的 Spark 应用程序。现在一切都工作了几个星期,但是几天我遇到了一些重复行的问题。这是由错误的表统计信息引起的。但是,可能的解决方法是删除并重新生成此表的统计信息。

因此我需要打开一个到我的 Phoenix 数据库的 JDBC 连接并调用语句来删除和创建统计信息。

由于我需要在通过 Spark 插入新数据后执行此操作,因此我还想在完成表批量加载后在我的 Spark 作业中创建和使用此 JDBC 连接。

为此,我添加了以下方法,并在我的 Java 代码中的 dataframe.save() 和 sparkContext.close() 方法之间调用它:

private static void updatePhoenixTableStatistics(String phoenixTableName) 
        try 
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            System.out.println("Connecting to database..");
            Connection conn = DriverManager.getConnection("jdbc:phoenix:my-server.net:2181:/hbase-unsecure");
            System.out.println("Creating statement...");
            Statement st = conn.createStatement();

            st.executeUpdate("DELETE FROM SYSTEM.STATS WHERE physical_name='" + phoenixTableName + "'");
            System.out.println("Successfully deleted statistics data... Now refreshing it.");

            st.executeUpdate("UPDATE STATISTICS " + phoenixTableName + " ALL");
            System.out.println("Successfully refreshed statistics data.");

            st.close();
            conn.close();

            System.out.println("Connection closed.");
         catch (Exception e) 
            System.out.println("Unable to update table statistics - Skipping this step!");
            e.printStackTrace();
        
    

问题是,自从我添加了此方法后,我的 Spark 作业结束时总是出现以下异常:

Bulk-Load: DataFrame.save() completed - Import finished successfully!
Updating Table Statistics:
Connecting to database..
Creating statement...
Successfully deleted statistics data... Now refreshing it.
Successfully refreshed statistics data.
Connection closed.
Exception in thread "Thread-31" java.lang.RuntimeException: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2794)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2646)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2518)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:1065)
        at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1119)
        at org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1520)
        at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:68)
        at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:82)
        at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:97)
        at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:49)
        at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl$1.call(ConfigurationFactory.java:46)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:78)
        at org.apache.phoenix.util.PhoenixContextExecutor.callWithoutPropagation(PhoenixContextExecutor.java:93)
        at org.apache.phoenix.query.ConfigurationFactory$ConfigurationFactoryImpl.getConfiguration(ConfigurationFactory.java:46)
        at org.apache.phoenix.jdbc.PhoenixDriver$1.run(PhoenixDriver.java:88)
Caused by: java.io.FileNotFoundException: /tmp/spark-e5b01508-0f84-4702-9684-4f6ceac803f9/gk-journal-importer-phoenix-0.0.3h.jar (No such file or directory)
        at java.util.zip.ZipFile.open(Native Method)
        at java.util.zip.ZipFile.<init>(ZipFile.java:225)
        at java.util.zip.ZipFile.<init>(ZipFile.java:155)
        at java.util.jar.JarFile.<init>(JarFile.java:166)
        at java.util.jar.JarFile.<init>(JarFile.java:103)
        at sun.net.www.protocol.jar.URLJarFile.<init>(URLJarFile.java:93)
        at sun.net.www.protocol.jar.URLJarFile.getJarFile(URLJarFile.java:69)
        at sun.net.www.protocol.jar.JarFileFactory.get(JarFileFactory.java:99)
        at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:122)
        at sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:152)
        at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2612)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2693)
        ... 14 more

有人知道这个问题并且可以提供帮助吗? Spark Job 中的 JDBC 通常如何工作?或者还有其他可能吗?

我正在使用安装了 Spark 2.3 和 Phoenix 4.7 的 HDP 2.6.5。感谢您的帮助!

【问题讨论】:

【参考方案1】:

我找到了解决问题的方法。我导出的 jar 包含 phoenix-spark2 和 phoenix-client 依赖项,并包含在我的 jar 文件中。

我将这些依赖项(因为它们已经存在于我的集群 HDP 安装中)更改为提供的范围:

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark2</artifactId>
    <version>4.7.0.2.6.5.0-292</version>
    <scope>provided</scope>                          <!-- this did it, now have to add --jar to spark-submit -->
</dependency>
<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>4.7.0.2.6.5.0-292</version>
    <scope>provided</scope>                          <!-- this did it, now have to add --jar to spark-submit -->
</dependency>

现在我使用 --jars 选项开始我的 Spark 作业,并在那里链接这些依赖项。现在它在 yarn-client 模式下工作正常。

spark-submit --class spark.dataimport.SparkImportApp --master yarn --deploy-mode client --jars /usr/hdp/current/phoenix-client/phoenix-spark2.jar,/usr/hdp/current/phoenix-client/phoenix-client.jar hdfs:/user/test/gk-journal-importer-phoenix-0.0.3h.jar <some parameters for the main method>

PS:在 yarn-cluster 模式下,应用程序一直都在工作(也可以使用包含依赖项的 fat-jar)。

【讨论】:

以上是关于在 Spark 应用程序中使用 JDBC的主要内容,如果未能解决你的问题,请参考以下文章

Spark 执行器在 jdbc 源中没有增加

Spark 无法从 SBT 找到 JDBC 驱动程序

Spark jdbc 重用连接

从Greenplum上的表中读取数据时,如何在Spark-jdbc应用程序的选项“dbtable”中指定子查询? [复制]

在运行 spark 应用程序时包含 aws jdbc 驱动程序

将 Spark 与 Flask 与 JDBC 一起使用