无法从 spark sql 插入配置单元分区表

Posted

技术标签:

【中文标题】无法从 spark sql 插入配置单元分区表【英文标题】:Unable to insert to hive partitioned table from spark sql 【发布时间】:2018-08-19 19:38:05 【问题描述】:

我有一个 hive 分区表 txnaggr_rt_fact 它有 2 个列分区 txninterval 和间隔类型。我正在尝试使用 java 从 spark sql 向该表插入一条记录。在插入期间抛出异常。如果分区不存在,因为它必须创建一个新分区并且这样做失败,但如果分区已经存在,则插入记录。

Java代码如下

SparkSession spark = SparkSession
          .builder()
          .appName("Java Spark Hive Example")
          .config("spark.sql.warehouse.dir", "hdfs://localhost:8020/user/hive/warehouse")
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode", "nonstrict")
          .enableHiveSupport()
          .getOrCreate();

        spark.sql("show databases").show();
        spark.sql("use nadb");
       /* spark.sql("hive.exec.dynamic.partition = true");
        spark.sql("set hive.exec.dynamic.partition.mode=nonstrict");*/
        spark.sql("insert into table txnaggr_rt_fact partition (txninterval = '2017-01-11', intervaltype='Test') values('"+21+"',null,'"+22+"',"+23+")");

插入hive分区表时的异常如下

Exception in thread "main" org.apache.spark.sql.AnalysisException: java.lang.NullPointerException: null;
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
        at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:249)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.cw.na.spark.HiveSqlTest.main(HiveSqlTest.java:76)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:3412)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1650)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1579)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.sql.hive.client.Shim_v0_14.loadPartition(HiveShim.scala:836)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply$mcV$sp(HiveClientImpl.scala:741)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadPartition$1.apply(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)
        at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:739)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply$mcV$sp(HiveExternalCatalog.scala:855)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadPartition$1.apply(HiveExternalCatalog.scala:843)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
        ... 25 more

我发现在 spark/conf 目录中的 hive-site.xml 中,hive.exec.dynamic.partition.mode 被设置为strict。我已将其更改为nonstrict,即使在代码中我也已为 hive.exec.dynamic.partition.mode 和 hive.exec.dynamic.partition 设置了配置,即使这样我也遇到了同样的异常。

hive-site.xml

  <property>
    <name>hive.exec.dynamic.partition</name>
    <value>true</value>
    <description>Whether or not to allow dynamic partitions in DML/DDL.</description>
  </property>
  <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
    <description>
      In strict mode, the user must specify at least one static partition
      in case the user accidentally overwrites all partitions.
      In nonstrict mode all partitions are allowed to be dynamic.
    </description>
  </property>

【问题讨论】:

【参考方案1】:

如果换成你怎么办

 spark.sql("insert into table txnaggr_rt_fact partition (txninterval = '2017-01-11', intervaltype='Test') values('"+21+"',null,'"+22+"',"+23+")");

试试

 spark.sql("insert overwrite table txnaggr_rt_fact partition (txninterval, intervaltype) values('"+21+"',null,'"+22+"',"+23+",'2017-01-11','Test')");

我是这样插入的:

sql("create table if not exists table_test (a int, b int) partitioned by (ds string)")
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("insert overwrite table_test partition(ds) select a, b, '2017-01-01' from other_table") 

【讨论】:

我在线程“main”java.lang.NoSuchMethodException 中收到以下异常:org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path , java.lang.String, java.util.Map, boolean, int, boolean, boolean, boolean, long) at java.lang.Class.getMethod(Class.java:1786) 我在上面的答案中添加了。 我不想覆盖表的现有数据。我只想附加到配置单元表。这种情况我该怎么办? 不覆盖只需从查询中删除“覆盖”。还要检查表的 DDL,例如“描述表”,以检查您的分区是否有两列。然后你可以像上面那样做 INSERT 语句。【参考方案2】:

首先您必须创建特殊表,因为它是事务性的,所以默认情况下它是关闭的。 示例:

CREATE TABLE table_name (
  id                int,
  name              string
)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true",
  "compactor.mapreduce.map.memory.mb"="2048",     -- specify compaction map job properties
  "compactorthreshold.hive.compactor.delta.num.threshold"="4",  -- trigger minor compaction if there are more than 4 delta directories
  "compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- trigger major compaction if the ratio of size of delta files to
                                                               -- size of base files is greater than 50%
);

然后执行一些像这样的配置https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions 一个大问题是你不会插入到它已经创建的外部表中。

【讨论】:

以上是关于无法从 spark sql 插入配置单元分区表的主要内容,如果未能解决你的问题,请参考以下文章

如何插入配置单元表,按从临时表读取的日期进行分区? [复制]

使用 spark hivecontext 读取外部 hive 分区表的问题

Spark SubQuery 扫描整个分区

如何使用 Spark SQL 识别 hive 表中的分区列

从 Spark 替换 hive 分区

通过 pyspark 数据框创建配置单元管理的分区表并为每个运行附加数据