Spark - 使用不同的数据类型以编程方式创建模式

Posted

技术标签:

【中文标题】Spark - 使用不同的数据类型以编程方式创建模式【英文标题】:Spark - creating schema programmatically with different data types 【发布时间】:2017-05-24 23:42:50 【问题描述】:

我有一个由 7-8 个字段组成的数据集,它们的类型为 String、Int 和 Float。

我正在尝试使用以下方法通过编程方法创建架构:

val schema = StructType(header.split(",").map(column => StructField(column, StringType, true)))

然后将其映射到 Row 类型,例如:

val dataRdd = datafile.filter(x => x!=header).map(x => x.split(",")).map(col => Row(col(0).trim, col(1).toInt, col(2).toFloat, col(3), col(4) ,col(5), col(6), col(7), col(8)))

但是当我使用 DF.show() 创建 DataFrame 后,它会为 Integer 字段提供错误。

那么如何在数据集中有多种数据类型的情况下创建这样的模式

【问题讨论】:

【参考方案1】:

您在代码中遇到的问题是您将所有字段都分配为 StringType。

假设标题中只有字段的名称,则无法猜测类型。

假设头部字符串是这样的

val header = "field1:Int,field2:Double,field3:String"

那么代码应该是

def inferType(field: String) = field.split(":")(1) match 
   case "Int" => IntegerType
   case "Double" => DoubleType
   case "String" => StringType
   case _ => StringType


val schema = StructType(header.split(",").map(column => StructField(column, inferType(column), true)))

对于你得到的标题字符串示例

root
 |-- field1:Int: integer (nullable = true)
 |-- field2:Double: double (nullable = true)
 |-- field3:String: string (nullable = true)

另一方面。如果您需要的是来自文本的数据框,我建议您直接从文件本身创建数据框。从 RDD 创建它是没有意义的。

val fileReader = spark.read.format("com.databricks.spark.csv")
  .option("mode", "DROPMALFORMED")
  .option("header", "true")
  .option("inferschema", "true")
  .option("delimiter", ",")

val df = fileReader.load(PATH_TO_FILE)

【讨论】:

但是标题字符串不是这样的,数据是dfs8768768, 65, 76.34, 234, dfgdg, 34.65 dfs8768768, 65, 76.34, 234, dfgdg, 34.65 那么就不可能从头部知道数据的类型,因为它没有提供。 这是带有标题的确切数据:Auctioned,bid,bidtime,bidder,bidderrate,openbid,price,item,daystolive 8213034715,15,12.373,baman,3,12,20,book1,5 8213034725,65,21.33,thmpu,2,64,75,watch1,9 8213034735,85,23.3,lovekush,4,45,90,remote1,10 8213034745,115,44.44,jaipanee,3,111,130,s3phone,4 您是否意识到 spark 无法从标题中猜测列的类型?检查我更新答案的链接。您宁愿考虑直接从数据创建数据框,而不创建 rdd。您可以通过实例化 spark.reader 来做到这一点。 感谢您回复 elghoto。但我试过了,我要解决这个问题。因为我没有尝试解决方法,所以我只想从 RDD 到 DF【参考方案2】:

先定义结构类型:

val schema1 = StructType(Array(
  StructField("AcutionId", StringType, true),
  StructField("Bid", IntegerType, false),
  StructField("BidTime", FloatType, false),
  StructField("Bidder", StringType, true),
  StructField("BidderRate", FloatType, false),
  StructField("OpenBid", FloatType, false),
  StructField("Price", FloatType, false),
  StructField("Item", StringType, true),
  StructField("DaystoLive", IntegerType, false)
))

然后通过将其转换为特定类型来指定将要出现在 Row 中的每一列:

val dataRdd = datafile.filter(x => x!=header).map(x => x.split(","))
  .map(col => Row(
    col(0).trim,
    col(1).trim.toInt,
    col(2).trim.toFloat,
    col(3).trim,
    col(4).trim.toFloat,
    col(5).trim.toFloat,
    col(6).trim.toFloat,
    col(7).trim,
    col(8).trim.toInt)
  )

然后将 Schema 应用到 RDD

val auctionDF = spark.sqlContext.createDataFrame(dataRdd,schema1)

【讨论】:

我们如何为 Struct 中的同一个文件提供不同的数据类型。

以上是关于Spark - 使用不同的数据类型以编程方式创建模式的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - 隐式创建模式和以编程方式创建模式之间的确切区别

Spark 编程模型(中)

在 servlet 中以编程方式调用过滤器(使用码头/火花创建)?

Spark基础编程学习01

在 iOS 8 中使用 NE***Manager,如何以编程方式创建与自定义 *** 类型的 *** 连接? (例如思科任何连接)

通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧