Spark - 使用不同的数据类型以编程方式创建模式
Posted
技术标签:
【中文标题】Spark - 使用不同的数据类型以编程方式创建模式【英文标题】:Spark - creating schema programmatically with different data types 【发布时间】:2017-05-24 23:42:50 【问题描述】:我有一个由 7-8 个字段组成的数据集,它们的类型为 String、Int 和 Float。
我正在尝试使用以下方法通过编程方法创建架构:
val schema = StructType(header.split(",").map(column => StructField(column, StringType, true)))
然后将其映射到 Row 类型,例如:
val dataRdd = datafile.filter(x => x!=header).map(x => x.split(",")).map(col => Row(col(0).trim, col(1).toInt, col(2).toFloat, col(3), col(4) ,col(5), col(6), col(7), col(8)))
但是当我使用 DF.show() 创建 DataFrame 后,它会为 Integer 字段提供错误。
那么如何在数据集中有多种数据类型的情况下创建这样的模式
【问题讨论】:
【参考方案1】:您在代码中遇到的问题是您将所有字段都分配为 StringType。
假设标题中只有字段的名称,则无法猜测类型。
假设头部字符串是这样的
val header = "field1:Int,field2:Double,field3:String"
那么代码应该是
def inferType(field: String) = field.split(":")(1) match
case "Int" => IntegerType
case "Double" => DoubleType
case "String" => StringType
case _ => StringType
val schema = StructType(header.split(",").map(column => StructField(column, inferType(column), true)))
对于你得到的标题字符串示例
root
|-- field1:Int: integer (nullable = true)
|-- field2:Double: double (nullable = true)
|-- field3:String: string (nullable = true)
另一方面。如果您需要的是来自文本的数据框,我建议您直接从文件本身创建数据框。从 RDD 创建它是没有意义的。
val fileReader = spark.read.format("com.databricks.spark.csv")
.option("mode", "DROPMALFORMED")
.option("header", "true")
.option("inferschema", "true")
.option("delimiter", ",")
val df = fileReader.load(PATH_TO_FILE)
【讨论】:
但是标题字符串不是这样的,数据是dfs8768768, 65, 76.34, 234, dfgdg, 34.65 dfs8768768, 65, 76.34, 234, dfgdg, 34.65
那么就不可能从头部知道数据的类型,因为它没有提供。
这是带有标题的确切数据:Auctioned,bid,bidtime,bidder,bidderrate,openbid,price,item,daystolive 8213034715,15,12.373,baman,3,12,20,book1,5 8213034725,65,21.33,thmpu,2,64,75,watch1,9 8213034735,85,23.3,lovekush,4,45,90,remote1,10 8213034745,115,44.44,jaipanee,3,111,130,s3phone,4
您是否意识到 spark 无法从标题中猜测列的类型?检查我更新答案的链接。您宁愿考虑直接从数据创建数据框,而不创建 rdd。您可以通过实例化 spark.reader 来做到这一点。
感谢您回复 elghoto。但我试过了,我要解决这个问题。因为我没有尝试解决方法,所以我只想从 RDD 到 DF【参考方案2】:
先定义结构类型:
val schema1 = StructType(Array(
StructField("AcutionId", StringType, true),
StructField("Bid", IntegerType, false),
StructField("BidTime", FloatType, false),
StructField("Bidder", StringType, true),
StructField("BidderRate", FloatType, false),
StructField("OpenBid", FloatType, false),
StructField("Price", FloatType, false),
StructField("Item", StringType, true),
StructField("DaystoLive", IntegerType, false)
))
然后通过将其转换为特定类型来指定将要出现在 Row 中的每一列:
val dataRdd = datafile.filter(x => x!=header).map(x => x.split(","))
.map(col => Row(
col(0).trim,
col(1).trim.toInt,
col(2).trim.toFloat,
col(3).trim,
col(4).trim.toFloat,
col(5).trim.toFloat,
col(6).trim.toFloat,
col(7).trim,
col(8).trim.toInt)
)
然后将 Schema 应用到 RDD
val auctionDF = spark.sqlContext.createDataFrame(dataRdd,schema1)
【讨论】:
我们如何为 Struct 中的同一个文件提供不同的数据类型。以上是关于Spark - 使用不同的数据类型以编程方式创建模式的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL - 隐式创建模式和以编程方式创建模式之间的确切区别
在 servlet 中以编程方式调用过滤器(使用码头/火花创建)?
在 iOS 8 中使用 NE***Manager,如何以编程方式创建与自定义 *** 类型的 *** 连接? (例如思科任何连接)