Spark - 将通用数组传递给 GenericRowWithSchema
Posted
技术标签:
【中文标题】Spark - 将通用数组传递给 GenericRowWithSchema【英文标题】:Spark - Pass Generic Array to GenericRowWithSchema 【发布时间】:2021-07-01 05:14:01 【问题描述】:我正在尝试构建一个从 HBase 表读取数据并写入 Hive 表的 Spark 实用程序。这里的先决条件是所有列都是字符串类型。
我这里的方法是读取RDD[(ImmutableBytesWritable, Result)]
中的Hbase 表,然后将其转换为RDD[GenericRowWithSchema]
,然后再转换为Spark 数据帧。我对第一部分很好,但在第二部分遇到问题。我的代码看起来像这样
def parseRow(result: Result, hiveColumns: Array[String], tableSchema: StructType): =
val rowKey = Bytes.toString(result.getRow)
val cfDataBytes = Bytes.toBytes("cf")
val colArray = hiveColumns.map(col => Bytes.toString(result.getValue(cfDataBytes, Bytes.toBytes(col))))
new GenericRowWithSchema(colArray, tableSchema)
我在声明 parseRow
函数时遇到错误
error:Type mismatch
found: Array[String]
required: Array[Any]
Note: String <: Any, but class Array is invariant in type T
You may wish to investigate a wildcard type such as `_<:Any`. (SLS 3.2.10)
new GenericRowWithSchema(colArray, tableSchema)
^
我使用的函数如下
val tableName = "xyz" // This will be an arg that is passed to the job
val hiveColumns: Array[String] = spark.table(tableName)
val tableSchema: StructType = StructType(hiveColumns.map(colName => StructField(colName, DataTypes.StringType, false)))
val hbaseConf = HBaseConfiguration.create
val scan: Scan = new Scan
hbaseConf.set(TableInputFormat.INPUT_TABLE, "hbaseSchema:hbaseTable")
hbaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.converScanToString(scan))
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
import spark.implicits._
val resultRDD: RDD[Result] = hbaseRDD.map(tuple => tuple._2)
val finalRDD: RDD[GenericRowWithSchema] = resultRDD.map(result => parseRow(result, hiveColumns, tableSchema))
如何将 Array[String] 的元素包裹在 Array[Any] 中,让构造函数可以接受?
【问题讨论】:
【参考方案1】:我们只需将Array[String]
扩展为Array[Any]
,如下所示
new GenericRowWithSchema(colArray.toArray, tableSchema)
【讨论】:
以上是关于Spark - 将通用数组传递给 GenericRowWithSchema的主要内容,如果未能解决你的问题,请参考以下文章
spark-streaming scala:如何将字符串数组传递给过滤器?
Spark 2 将 scala 数组转换为 WrappedArray