Spark - 将通用数组传递给 GenericRowWithSchema

Posted

技术标签:

【中文标题】Spark - 将通用数组传递给 GenericRowWithSchema【英文标题】:Spark - Pass Generic Array to GenericRowWithSchema 【发布时间】:2021-07-01 05:14:01 【问题描述】:

我正在尝试构建一个从 HBase 表读取数据并写入 Hive 表的 Spark 实用程序。这里的先决条件是所有列都是字符串类型。

我这里的方法是读取RDD[(ImmutableBytesWritable, Result)] 中的Hbase 表,然后将其转换为RDD[GenericRowWithSchema],然后再转换为Spark 数据帧。我对第一部分很好,但在第二部分遇到问题。我的代码看起来像这样

def parseRow(result: Result, hiveColumns: Array[String], tableSchema: StructType):  = 

  val rowKey = Bytes.toString(result.getRow)
  val cfDataBytes = Bytes.toBytes("cf")

  val colArray = hiveColumns.map(col => Bytes.toString(result.getValue(cfDataBytes, Bytes.toBytes(col))))

  new GenericRowWithSchema(colArray, tableSchema)


我在声明 parseRow 函数时遇到错误

error:Type mismatch
found: Array[String]
required: Array[Any]

Note: String <: Any, but class Array is invariant in type T
You may wish to investigate a wildcard type such as `_<:Any`. (SLS 3.2.10)
  new GenericRowWithSchema(colArray, tableSchema)
                           ^

我使用的函数如下

val tableName = "xyz" // This will be an arg that is passed to the job
val hiveColumns: Array[String] = spark.table(tableName)

val tableSchema: StructType = StructType(hiveColumns.map(colName => StructField(colName, DataTypes.StringType, false)))

val hbaseConf = HBaseConfiguration.create
val scan: Scan = new Scan

hbaseConf.set(TableInputFormat.INPUT_TABLE, "hbaseSchema:hbaseTable")
hbaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.converScanToString(scan))

val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

import spark.implicits._

val resultRDD: RDD[Result] = hbaseRDD.map(tuple => tuple._2)

val finalRDD: RDD[GenericRowWithSchema] = resultRDD.map(result => parseRow(result, hiveColumns, tableSchema))

如何将 Array[String] 的元素包裹在 Array[Any] 中,让构造函数可以接受?

【问题讨论】:

【参考方案1】:

我们只需将Array[String] 扩展为Array[Any],如下所示

new GenericRowWithSchema(colArray.toArray, tableSchema)

【讨论】:

以上是关于Spark - 将通用数组传递给 GenericRowWithSchema的主要内容,如果未能解决你的问题,请参考以下文章

spark-streaming scala:如何将字符串数组传递给过滤器?

Spark 2 将 scala 数组转换为 WrappedArray

Scala:无法将数组传递给需要 Seq 或 Iterable 的函数

C,如何将多维数组传递给 CLR/类库项目中的函数

将 jdbc 连接传递给 spark 读取

Spark:如何将 PartialFunction 传递给 DStream?