如何访问数据框 .proto、ScalaPB 中的嵌套字段
Posted
技术标签:
【中文标题】如何访问数据框 .proto、ScalaPB 中的嵌套字段【英文标题】:How do I access the nested fields in the dataframe .proto, ScalaPB 【发布时间】:2016-11-20 17:55:51 【问题描述】:以下我的数据框架构
root
|-- name: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- street: string (nullable = true)
| | |-- city: string (nullable = true)
我想输出姓名和城市。以下是我的 spark 流应用程序,它输出名称和地址,但我想要输出中的名称和城市。 感谢你的帮助。谢谢。
object PersonConsumer
import org.apache.spark.sql.SQLContext, SparkSession
import com.example.protos.demo._
def main(args : Array[String])
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","person").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")
ds2.printSchema()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
【问题讨论】:
【参考方案1】:您可以简单地获取名称和城市的数据框,然后您就可以使用它,对于获取名称和城市的数据框,您可以同时选择如下
ds1.select("name","addresses.element.city")
【讨论】:
【参考方案2】:感谢桑迪普。 select("name","addresses.element.city") 给我错误,因为地址是 Seq[Address] 并且我想要输出中的所有城市。
最后我写了以下函数来获取所有城市..
def getCities(addresses: Seq[Address]) : String =
var cities:String = ""
if (addresses.size > 0)
cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
// cities = addresses.foldLeft("")((str,addr) => str + addr.city.getOrElse(""))
cities
【讨论】:
以上是关于如何访问数据框 .proto、ScalaPB 中的嵌套字段的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Mashaller 中使用 http 请求标头进行内容协商?