从 avro 文件将数据集转换为数据框

Posted

技术标签:

【中文标题】从 avro 文件将数据集转换为数据框【英文标题】:Convert dataset to dataframe from an avro file 【发布时间】:2021-05-01 15:58:47 【问题描述】:

我编写了一个 scala 脚本来加载一个 avro 文件,并使用生成的数据(以检索***贡献者)。 问题是,在加载文件时,它提供了一个我无法转换为数据框的数据集,因为它包含一些复杂的类型:

    val history_src = "path_to_avro_files\\frwiki*.avro"
    val revisions_dataset = spark.read.format("avro").load(history_src) 
//gives a dataset the we can see the data and make a take(1) without problems 
    
    val first_essay = revisions_dataset.map(row => (row.getString(0), row.getLong(2), row.get(3).asInstanceOf[mutable.WrappedArray[Revision]].array
      .map(x=> (x.r_contributor.r_username, x.r_contributor.r_contributor_id, x.r_contributor.r_contributor_ip)))).take(1) 
//gives GenericRowWithSchema cannot be cast to Revision

    val second_essay = revisions_dataset.map(row => (row.getString(0), row.getLong(2), row.get(3).asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]].toStream
      .map(x=> (x.getLong(0),row.get(3).asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]].map(c => (c.getLong(0))))))).take(1) 
//  gives WrappedArray$ofRef cannot be cast to scala.collection.mutable.ArrayBuffer

我使用下面的案例类尝试使用编码器和编码器,但没有成功

  case class History (title: String, namespace: Long, id: Long, revisions: Array[Revision])
  case class Contributor (r_username: String, r_contributor_id: Long, r_contributor_ip: String)
  case class Revision(r_id: Long, r_parent_id: Long, timestamp : Long, r_contributor: Contributor, sha: String)

我可以从我的 revisions_dataset 生成架构是这样的,它给出了这个:

root
|-- p_title: string (nullable = true)
|-- p_namespace: long (nullable = true)
|-- p_id: long (nullable = true)
|-- p_revisions: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- r_id: long (nullable = true)
|    |    |-- r_parent_id: long (nullable = true)
|    |    |-- r_timestamp: long (nullable = true)
|    |    |-- r_contributor: struct (nullable = true)
|    |    |    |-- r_username: string (nullable = true)
|    |    |    |-- r_contributor_id: long (nullable = true)
|    |    |    |-- r_contributor_ip: string (nullable = true)
|    |    |-- r_sha1: string (nullable = true)

我的目标是拥有一个数据框,以便能够检索修订列表中的贡献者列表并将其展平以在页面内包含贡献者列表(与标题具有相同的级别)。

有什么帮助吗?

【问题讨论】:

【参考方案1】:
import org.apache.spark.sql.functions._

val r1 = Revision(1, 1, 1, Contributor("c1", 1, "ip1"), "sha")
val r2 = Revision(1, 1, 1, Contributor("c2", 2, "ip2"), "sha")
val r3 = Revision(1, 1, 1, Contributor("c3", 3, "ip3"), "sha")
val revisions_dataset = Seq(
  ("title1", 0L, 1L, Array(r1, r2)),
  ("title1", 0L, 2L, Array(r1, r3)),
  ("title1", 0L, 3L, Array(r2))
).toDF("p_title", "p_namespace", "p_id", "p_revisions")

val flattened = revisions_dataset.select($"p_title", $"p_id", explode($"p_revisions").alias("p_revision"))
        .withColumn("r_contributor_username", $"p_revision.r_contributor.r_username")
        .withColumn("r_contributor_id", $"p_revision.r_contributor.r_contributor_id")
        .withColumn("r_contributor_ip", $"p_revision.r_contributor.r_contributor_ip")
        .drop("p_revision")

flattened.show(false)

输出:

+-------+----+----------------------+----------------+----------------+
|p_title|p_id|r_contributor_username|r_contributor_id|r_contributor_ip|
+-------+----+----------------------+----------------+----------------+
|title1 |1   |c1                    |1               |ip1             |
|title1 |1   |c2                    |2               |ip2             |
|title1 |2   |c1                    |1               |ip1             |
|title1 |2   |c3                    |3               |ip3             |
|title1 |3   |c2                    |2               |ip2             |
+-------+----+----------------------+----------------+----------------+

【讨论】:

以上是关于从 avro 文件将数据集转换为数据框的主要内容,如果未能解决你的问题,请参考以下文章

spark scala将嵌套的数据框转换为嵌套的数据集

如何将 sql 转换为 spark 数据集?

R语言数据转换(一)2021.2.25

python 将单独的数据文件转换为一个数据集,从单词或单词中获取向量,从上下文中获取向量

如何从 hdf5 保存/提取数据集并转换为 TiFF?

如何将数据集转换为数据表