无法从 spark sql 插入配置单元分区表
Posted
技术标签:
【中文标题】无法从 spark sql 插入配置单元分区表【英文标题】:Unable to insert to hive partitioned table from spark sql 【发布时间】:2018-08-19 19:38:05 【问题描述】:我有一个 hive 分区表 txnaggr_rt_fact 它有 2 个列分区 txninterval 和间隔类型。我正在尝试使用 java 从 spark sql 向该表插入一条记录。在插入期间抛出异常。如果分区不存在,因为它必须创建一个新分区并且这样做失败,但如果分区已经存在,则插入记录。
Java代码如下
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", "hdfs://localhost:8020/user/hive/warehouse")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate();
spark.sql("show databases").show();
spark.sql("use nadb");
/* spark.sql("hive.exec.dynamic.partition = true");
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict");*/
spark.sql("insert into table txnaggr_rt_fact partition (txninterval = '2017-01-11', intervaltype='Test') values('"+21+"',null,'"+22+"',"+23+")");
插入hive分区表时的异常如下
Exception in thread "main" org.apache.spark.sql.AnalysisException: java.lang.NullPointerException: null;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:843)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:249)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
at com.cw.na.spark.HiveSqlTest.main(HiveSqlTest.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:3412)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1650)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1579)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.sql.hive.client.Shim_v0_14.loadPartition(HiveShim.scala:836)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply$mcV$sp(HiveClientImpl.scala:741)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)
at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:739)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply$mcV$sp(HiveExternalCatalog.scala:855)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
... 25 more
我发现在 spark/conf 目录中的 hive-site.xml 中,hive.exec.dynamic.partition.mode
被设置为strict
。我已将其更改为nonstrict
,即使在代码中我也已为 hive.exec.dynamic.partition.mode 和 hive.exec.dynamic.partition 设置了配置,即使这样我也遇到了同样的异常。
hive-site.xml
<property>
<name>hive.exec.dynamic.partition</name>
<value>true</value>
<description>Whether or not to allow dynamic partitions in DML/DDL.</description>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
<description>
In strict mode, the user must specify at least one static partition
in case the user accidentally overwrites all partitions.
In nonstrict mode all partitions are allowed to be dynamic.
</description>
</property>
【问题讨论】:
【参考方案1】:如果换成你怎么办
spark.sql("insert into table txnaggr_rt_fact partition (txninterval = '2017-01-11', intervaltype='Test') values('"+21+"',null,'"+22+"',"+23+")");
试试
spark.sql("insert overwrite table txnaggr_rt_fact partition (txninterval, intervaltype) values('"+21+"',null,'"+22+"',"+23+",'2017-01-11','Test')");
我是这样插入的:
sql("create table if not exists table_test (a int, b int) partitioned by (ds string)")
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("insert overwrite table_test partition(ds) select a, b, '2017-01-01' from other_table")
【讨论】:
我在线程“main”java.lang.NoSuchMethodException 中收到以下异常:org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path , java.lang.String, java.util.Map, boolean, int, boolean, boolean, boolean, long) at java.lang.Class.getMethod(Class.java:1786) 我在上面的答案中添加了。 我不想覆盖表的现有数据。我只想附加到配置单元表。这种情况我该怎么办? 不覆盖只需从查询中删除“覆盖”。还要检查表的 DDL,例如“描述表”,以检查您的分区是否有两列。然后你可以像上面那样做 INSERT 语句。【参考方案2】:首先您必须创建特殊表,因为它是事务性的,所以默认情况下它是关闭的。 示例:
CREATE TABLE table_name (
id int,
name string
)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true",
"compactor.mapreduce.map.memory.mb"="2048", -- specify compaction map job properties
"compactorthreshold.hive.compactor.delta.num.threshold"="4", -- trigger minor compaction if there are more than 4 delta directories
"compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- trigger major compaction if the ratio of size of delta files to
-- size of base files is greater than 50%
);
然后执行一些像这样的配置https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions 一个大问题是你不会插入到它已经创建的外部表中。
【讨论】:
以上是关于无法从 spark sql 插入配置单元分区表的主要内容,如果未能解决你的问题,请参考以下文章
如何插入配置单元表,按从临时表读取的日期进行分区? [复制]