如何访问数据框 .proto、ScalaPB 中的嵌套字段

Posted

技术标签:

【中文标题】如何访问数据框 .proto、ScalaPB 中的嵌套字段【英文标题】:How do I access the nested fields in the dataframe .proto, ScalaPB 【发布时间】:2016-11-20 17:55:51 【问题描述】:

以下我的数据框架构

root
 |-- name: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- city: string (nullable = true)

我想输出姓名和城市。以下是我的 spark 流应用程序,它输出名称和地址,但我想要输出中的名称和城市。 感谢你的帮助。谢谢。

object PersonConsumer 
  import org.apache.spark.sql.SQLContext, SparkSession
  import com.example.protos.demo._

  def main(args : Array[String]) 

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").
      option("kafka.bootstrap.servers","localhost:9092").
      option("subscribe","person").load()

    val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")

    ds2.printSchema()

    val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  

【问题讨论】:

【参考方案1】:

您可以简单地获取名称和城市的数据框,然后您就可以使用它,对于获取名称和城市的数据框,您可以同时选择如下

ds1.select("name","addresses.element.city")

【讨论】:

【参考方案2】:

感谢桑迪普。 select("name","addresses.element.city") 给我错误,因为地址是 Seq[Address] 并且我想要输出中的所有城市。

最后我写了以下函数来获取所有城市..

    def getCities(addresses: Seq[Address]) : String = 
      var cities:String = ""
      if (addresses.size > 0) 
        cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
//        cities = addresses.foldLeft("")((str,addr) => str  + addr.city.getOrElse(""))
      
      cities
    

【讨论】:

以上是关于如何访问数据框 .proto、ScalaPB 中的嵌套字段的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Mashaller 中使用 http 请求标头进行内容协商?

如何访问数据框中的数字[重复]

如何访问pyspark数据框中的动态列

如何使用 C# 从文本框中的访问数据库中获取数据

如何访问存储在scala spark中的数据框中的映射值和键

如何使用 loc[i,j] 根据索引值访问数据框中的特定值