如何在不使用 HDP 3.1 中的仓库连接器的情况下将表从 spark 中写入 hive
Posted
技术标签:
【中文标题】如何在不使用 HDP 3.1 中的仓库连接器的情况下将表从 spark 中写入 hive【英文标题】:How to write a table to hive from spark without using the warehouse connector in HDP 3.1 【发布时间】:2019-10-16 05:11:45 【问题描述】:当尝试在 HDP 3.1 上使用 spark 2.3 写入 Hive 表而不使用仓库连接器时直接使用:
spark-shell --driver-memory 16g --master local[3] --conf spark.hadoop.metastore.catalog.default=hive
val df = Seq(1,2,3,4).toDF
spark.sql("create database foo")
df.write.saveAsTable("foo.my_table_01")
失败:
Table foo.my_table_01 failed strict managed table checks due to the following reason: Table is marked as a managed table but is not transactional
但是一个:
val df = Seq(1,2,3,4).toDF.withColumn("part", col("value"))
df.write.partitionBy("part").option("compression", "zlib").mode(SaveMode.Overwrite).format("orc").saveAsTable("foo.my_table_02")
spark.sql("select * from foo.my_table_02").show
的 Spark 工作得很好。
现在去蜂巢/直线:
0: jdbc:hive2://hostname:2181/> select * from my_table_02;
Error: java.io.IOException: java.lang.IllegalArgumentException: bucketId out of range: -1 (state=,code=0)
一个
describe extended my_table_02;
返回
+-----------------------------+----------------------------------------------------+----------+
| col_name | data_type | comment |
+-----------------------------+----------------------------------------------------+----------+
| value | int | |
| part | int | |
| | NULL | NULL |
| # Partition Information | NULL | NULL |
| # col_name | data_type | comment |
| part | int | |
| | NULL | NULL |
| Detailed Table Information | Table(tableName:my_table_02, dbName:foo, owner:hive/bd-sandbox.t-mobile.at@SANDBOX.MAGENTA.COM, createTime:1571201905, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:value, type:int, comment:null), FieldSchema(name:part, type:int, comment:null)], location:hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:path=hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, compression=zlib, serialization.format=1), bucketCols:[], sortCols:[], parameters:, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:), storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:part, type:int, comment:null)], parameters:numRows=0, rawDataSize=0, spark.sql.sources.schema.partCol.0=part, transient_lastDdlTime=1571201906, bucketing_version=2, spark.sql.create.version=2.3.2.3.1.0.0-78, totalSize=740, spark.sql.sources.schema.numPartCols=1, spark.sql.sources.schema.part.0=\"type\":\"struct\",\"fields\":[\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":,\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":], numFiles=4, numPartitions=4, spark.sql.partitionProvider=catalog, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=orc, transactional=true, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER, writeId:-1) |
如何使用 spark 写入 hive 不使用仓库连接器,但仍写入稍后可由 hive 读取的同一元存储?
据我所知,外部表应该是可能的(你不是托管的,不是 ACID 不是事务的),但我不确定如何告诉 saveAsTable
如何处理这些。
编辑
相关问题:
https://community.cloudera.com/t5/Support-Questions/In-hdp-3-0-can-t-create-hive-table-in-spark-failed/td-p/202647 Table loaded through Spark not accessible in Hive 设置答案中提出的属性并不能解决我的问题 似乎也是一个错误:https://issues.apache.org/jira/browse/HIVE-20593可能是像https://github.com/qubole/spark-acid 像https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html 这样的解决方法,但我不喜欢在我还没有看到任何大规模性能测试的情况下使用更多胶带的想法。此外,这意味着更改所有现有的 Spark 作业。
事实上,Cant save table to hive metastore, HDP 3.0 报告了大型数据框和仓库连接器的问题。
编辑
我刚刚找到https://community.cloudera.com/t5/Support-Questions/Spark-hive-warehouse-connector-not-loading-data-when-using/td-p/243613
还有:
execute() 与 executeQuery()
ExecuteQuery() 将始终使用 Hiveserver2-interactive/LLAP 作为它 使用快速 ARROW 协议。当 jdbc URL 指向 非 LLAP Hiveserver2 将产生错误。
Execute() 使用 JDBC,对 LLAP 没有这种依赖,但有 一个内置限制,最多只能返回 1.000 条记录。但对于大多数 查询(INSERT INTO ... SELECT、count、sum、average)不是 问题。
但这不会扼杀 hive 和 spark 之间的任何高性能互操作性吗?特别是如果没有足够的 LLAP 节点可用于大规模 ETL。
事实上,这是真的。这个设置可以在https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java#L39配置,虽然我不确定增加这个值对性能的影响
【问题讨论】:
您是否尝试将表存储格式显式设置为 Hive ACID 不支持的非默认(即非 ORC)格式,因此不应与新的默认 ACID 设置混淆?比如 Parquet、AVRO、CSV 等等? 恕我直言,解决这个问题的最佳方法是禁用 Ambari 中新的“ACID-by-default”设置。如果您需要 ACID,请在 Hive 中的CREATE TABLE
中明确说明——就像在 HDP 2.x 中一样
这听起来很明智。你知道在哪里改变它/这个属性的键吗?
【参考方案1】:
你试过了吗
data.write \
.mode("append") \
.insertInto("tableName")
【讨论】:
否,因为该表还不存在,我想使用 spark 创建它。 可能是 .mode("overwrite") 会有所帮助吗?通常 saveAsTable 效果很好,但不知道为什么会出现上述错误。 正常情况下你是指HDP 2.x还是3.x?【参考方案2】:在 Ambari 内部,只需禁用默认创建事务表的选项即可解决我的问题。
两次设置为 false (tez, llap)
hive.strict.managed.tables = false
并根据需要在每个 table property
中手动启用(以使用事务表)。
【讨论】:
这仅适用于镶木地板,不适用于 ORC。在后一种情况下:java.lang.IllegalArgumentException: bucketId out of range: -1 (state=,code=0)
仍然存在。这也能解决吗?
那是一个新的 ORC 表,在没有事务道具的情况下创建的吗?还是现有的 ORC 表(事务无法恢复,永远...)?当然,您是否重新启动了 Metastore 服务?
一个新的。我明确地重新启动了 spark shell 并删除了现有的表。 ambari 重新启动了所有受影响的服务。
您是否尝试切换spark.sql.orc.impl
?在 2.3 中默认为 hive
,在 2.4+ 中默认为 native
>> 另外,您是否检查过新表确实是“非事务性”创建的?
当您从 Hive 本身创建表时,它是否“事务性”?如果没有,那么诀窍是将适当的 Hive 属性注入 Hive-Metastore-client-inside-Spark-Context 使用的配置中。通过自定义hive-site.xml
在 CLASSPATH (这就是 Hadoop 库搜索其配置的方式) 中的目录中,或者通过自定义 spark.hadoop.*
属性传递给 Spark,然后自动注入到 Hadoop 道具中,覆盖默认配置文件。【参考方案3】:
创建一个外部表(作为一种解决方法)似乎是我的最佳选择。 这仍然涉及到 HWC 注册列元数据或更新分区信息。
类似的东西:
val df:DataFrame = ...
val externalPath = "/warehouse/tablespace/external/hive/my_db.db/my_table"
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
dxx.write.partitionBy("part_col").option("compression", "zlib").mode(SaveMode.Overwrite).orc(externalPath)
val columns = dxx.drop("part_col").schema.fields.map(field => s"$field.name $field.dataType.simpleString").mkString(", ")
val ddl =
s"""
|CREATE EXTERNAL TABLE my_db.my_table ($columns)
|PARTITIONED BY (part_col string)
|STORED AS ORC
|Location '$externalPath'
""".stripMargin
hive.execute(ddl)
hive.execute(s"MSCK REPAIR TABLE $tablename SYNC PARTITIONS")
不幸的是,这会引发:
java.sql.SQLException: The query did not generate a result set!
来自 HWC
【讨论】:
不过,我更愿意(正如 Samson Scharfrichter 建议的那样)重新配置 hive,而不是默认放置transactional
属性。但是,我找不到解决方案。【参考方案4】:
“如何在不使用仓库连接器的情况下使用 spark 写入 hive,但仍写入稍后可以被 hive 读取的同一个元存储?”
我们正在使用相同的设置(HDP 3.1 和 Spark 2.3)。使用下面的代码,我们得到了与“bucketId out of range: -1”相同的错误消息。解决方案是在尝试查询表之前在 Hive shell 中运行 set hive.fetch.task.conversion=none;
。
在没有 HWC 的情况下将数据写入 Hive 的代码:
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
case class Record(key: Int, value: String)
val spark = SparkSession.builder()
.master("yarn")
.appName("SparkHiveExample")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
spark.sql("USE databaseName")
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("sparkhive_records")
[来自 https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html 的示例]
【讨论】:
以上是关于如何在不使用 HDP 3.1 中的仓库连接器的情况下将表从 spark 中写入 hive的主要内容,如果未能解决你的问题,请参考以下文章
在不中断授权流程的情况下替换 OpenIdConnectOptions 配置中的发现文档端点 (OpenID Connect .Net Core 3.1)
如何在不使用 SQL 中的多个连接条件的情况下获取同一行中的所有值?