将包含 BigInt 的 RDD 转换为 Spark Dataframe

Posted

技术标签:

【中文标题】将包含 BigInt 的 RDD 转换为 Spark Dataframe【英文标题】:converting RDD containing BigInt to Spark Dataframe 【发布时间】:2017-10-12 15:37:49 【问题描述】:

您好,我正在使用 spark 1.6.3。我有一个 rdd,其中有一些 BigInt scala 类型。我将如何将其转换为 spark 数据框? 是否可以在创建数据框之前转换类型?

我的rdd:

Array[(BigInt, String, String, BigInt, BigInt, BigInt, BigInt, List[String])] = Array((14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161702,8702170626376335,59,527780275219,List(NavigationLevel, Session)), (14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161356,8702171157207449,72,527780278061,List(StartPlay, Action, Session)))

打印出来:

(14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161356,8702171157207449,72,527780278061,List(StartPlay, Action, Session))
(14183197,Browse,3393626f-98e3-4973-8d38-6b2fb17454b5_27331247X28X6839X1506087469573,80161702,8702170626376335,59,527780275219,List(NavigationLevel, Session))

我已经厌倦了创建模式对象;

  val schema = StructType(Array(
    StructField("trackId", LongType, true),
    StructField("location", StringType, true),
    StructField("listId", StringType, true),
    StructField("videoId", LongType, true),
    StructField("id", LongType, true),
    StructField("sequence", LongType, true),
    StructField("time", LongType, true),
    StructField("type", ArrayType(StringType), true)
  ))

如果我尝试val df = sqlContext.createDataFrame(rdd, schema) 我会收到此错误

error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[(BigInt, String, String, BigInt, BigInt, BigInt, BigInt, scala.collection.immutable.List[String])], org.apache.spark.sql.types.StructType)

或者如果我尝试val df = sc.parallelize(rdd.toSeq).toDF,我会收到以下错误;

error: value toSeq is not a member of org.apache.spark.rdd.RDD[(BigInt, String, String, BigInt, BigInt, BigInt, BigInt, List[String])]

感谢任何帮助

【问题讨论】:

【参考方案1】:

架构只能与RDD[Row] 一起使用。这里使用反射:

sqlContext.createDataFrame(rdd)

您还可以将BigInt 更改为one of the supported types (BigDecimal?) 或use binary encoder for this field。

【讨论】:

感谢您的评论,我收到了java.lang.UnsupportedOperationException: Schema for type scala.BigInt is not supported 错误

以上是关于将包含 BigInt 的 RDD 转换为 Spark Dataframe的主要内容,如果未能解决你的问题,请参考以下文章

070 DStream中的transform和foreachRDD函数

RDD转换算子--双value

RDD转换算子--双value

Spark中的DataFrame,Dataset和RDD之间的区别

将 RDD 转换为 Dataframe Spark

将 RDD 转换为 kmeans 的有效输入