Spark sql注册的临时表不能在sqlContext.read()中使用?

Posted

技术标签:

【中文标题】Spark sql注册的临时表不能在sqlContext.read()中使用?【英文标题】:Spark sql registered temp table cannot be used in sqlContext.read()? 【发布时间】:2020-08-09 16:21:54 【问题描述】:

我有以下代码

Map<String, String> props = getDbConnectionProps();
        props.put("dbtable", sql);
        props.put("fetchSize", "100000");
        props.put("partitionColumn", "col1");
        props.put("lowerBound", "25");
        props.put("upperBound", "100");
        props.put("numPartitions", "10");
String sql = "..."
DataFrame df = sqlContext.read().format("jdbc").options(props).load();
df.registerTempTable("myTable");
df.cache();

Map<String, String> props = getDbConnectionProps();
        props.put("dbtable", sql2);
        props.put("fetchSize", "100000");
        props.put("partitionColumn", "col1");
        props.put("lowerBound", "25");
        props.put("upperBound", "100");
        props.put("numPartitions", "10");
String sql2 = "... inner join myTable on ...."   // Note here the sql2 use the temp table
DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();

但是,下面出现错误

java.sql.SQLSyntaxErrorException: Table 'myDbSchema.myTable' doesn't exist

所以注册的临时表不能在sqlContext.read()中使用?我知道我可以使用sqlContext.sql(sql2) 来使用临时表获取结果。但是,如何以 sqlContext.sql() 的方式设置分区信息等属性呢?

谢谢。

【问题讨论】:

【参考方案1】:

很明显,您正在使用.format("jdbc") 从数据库中读取数据,而df.registerTempTable("myTable"); 是加载数据后内存中存在的火花实体/数据。

DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();

并且错误指出 myDbSchema.myTable 不存在,因为您传递的查询字符串正在数据库上运行。

sql2 = "... inner join myTable on ...."
java.sql.SQLSyntaxErrorException: Table 'myDbSchema.myTable' doesn't exist

对于您的问题:我知道我可以使用 sqlContext.sql(sql2) 来使用临时表获取结果。但是,如何以 sqlContext.sql() 的方式设置分区信息等属性呢?

当两个数据集太大而无法由数据库连接/处理时,解决方案 1 是最佳选择,反之亦然。请在下面找到伪代码。

解决方案一:在DF2中加载第二张表的数据,然后在spark中进行join。

DataFrame df = sqlContext.read().format("jdbc").options(props).load();
DataFrame df2 = sqlContext.read().format("jdbc").options(props2).load();
spark.conf.set("spark.sql.shuffle.partitions",10)
DataFrame joindf = df.join(df2, joinCondition, "inner")

解决方案2:通过连接两个表在数据库中创建一个视图/表,例如joinedview这里,并通过读取并行性加载数据=>分区到spark中。

In Database:
create view joinedview as 
select * from table inner join myTable 
on (joincondition)

In Spark:
Map<String, String> props = getDbConnectionProps();
        props.put("dbtable", joinedview);
        props.put("fetchSize", "100000");
        props.put("partitionColumn", "col1");
        props.put("lowerBound", "25");
        props.put("upperBound", "100");
        props.put("numPartitions", "10");
DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();

【讨论】:

【参考方案2】:

我不知道如何在没有 sql 的情况下执行此操作,但我认为错误可能会出现,因为您正在尝试使用 format("jdbc") 而不是 from 但是您的临时存储已设置.

【讨论】:

以上是关于Spark sql注册的临时表不能在sqlContext.read()中使用?的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark-sql 缓存临时表

如何在 Spark SQL 中缓存和持久化临时表?

Spark:通过对临时表执行 sql 查询来创建临时表

DataFrame映射表的形式

Spark DataFrame vector 类型存储到Hive表

如何将 Excel 中的数据插入 Spark SQL 中的临时表中