Spark 中 bigint 的兼容数据类型是啥?我们如何将 bigint 转换为 spark 兼容的数据类型?

Posted

技术标签:

【中文标题】Spark 中 bigint 的兼容数据类型是啥?我们如何将 bigint 转换为 spark 兼容的数据类型?【英文标题】:What is the compatible datatype for bigint in Spark and how can we cast bigint into a spark compatible datatype?Spark 中 bigint 的兼容数据类型是什么?我们如何将 bigint 转换为 spark 兼容的数据类型? 【发布时间】:2019-02-11 14:18:05 【问题描述】:

我正在尝试使用 Spark 将数据从 greenplum 移动到 HDFS。我可以从源表中成功读取数据,并且数据帧(greenplum 表)的 spark 推断架构是:

DataFrame 架构:

 je_header_id: long (nullable = true)
 je_line_num: long (nullable = true)
 last_updated_by: decimal(15,0) (nullable = true)
 last_updated_by_name: string (nullable = true)
 ledger_id: long (nullable = true)
 code_combination_id: long (nullable = true)
 balancing_segment: string (nullable = true)
 cost_center_segment: string (nullable = true)
 period_name: string (nullable = true)
 effective_date: timestamp (nullable = true)
 status: string (nullable = true)
 creation_date: timestamp (nullable = true)
 created_by: decimal(15,0) (nullable = true)
 entered_dr: decimal(38,20) (nullable = true)
 entered_cr: decimal(38,20) (nullable = true)
 entered_amount: decimal(38,20) (nullable = true)
 accounted_dr: decimal(38,20) (nullable = true)
 accounted_cr: decimal(38,20) (nullable = true)
 accounted_amount: decimal(38,20) (nullable = true)
 xx_last_update_log_id: integer (nullable = true)
 source_system_name: string (nullable = true)
 period_year: decimal(15,0) (nullable = true)
 period_num: decimal(15,0) (nullable = true)

Hive表对应的schema是:

je_header_id:bigint|je_line_num:bigint|last_updated_by:bigint|last_updated_by_name:string|ledger_id:bigint|code_combination_id:bigint|balancing_segment:string|cost_center_segment:string|period_name:string|effective_date:timestamp|status:string|creation_date:timestamp|created_by:bigint|entered_dr:double|entered_cr:double|entered_amount:double|accounted_dr:double|accounted_cr:double|accounted_amount:double|xx_last_update_log_id:int|source_system_name:string|period_year:bigint|period_num:bigint

使用上面提到的 Hive 表架构,我使用逻辑创建了以下 StructType:

def convertDatatype(datatype: String): DataType = 
  val convert = datatype match 
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  
  convert

准备好的架构:

 je_header_id: long (nullable = true)
 je_line_num: long (nullable = true)
 last_updated_by: long (nullable = true)
 last_updated_by_name: string (nullable = true)
 ledger_id: long (nullable = true)
 code_combination_id: long (nullable = true)
 balancing_segment: string (nullable = true)
 cost_center_segment: string (nullable = true)
 period_name: string (nullable = true)
 effective_date: timestamp (nullable = true)
 status: string (nullable = true)
 creation_date: timestamp (nullable = true)
 created_by: long (nullable = true)
 entered_dr: double (nullable = true)
 entered_cr: double (nullable = true)
 entered_amount: double (nullable = true)
 accounted_dr: double (nullable = true)
 accounted_cr: double (nullable = true)
 accounted_amount: double (nullable = true)
 xx_last_update_log_id: integer (nullable = true)
 source_system_name: string (nullable = true)
 period_year: long (nullable = true)
 period_num: long (nullable = true)

当我尝试在数据框 Schema 上应用我的 newSchema 时,出现异常:

java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of bigint

我知道它正在尝试将 BigDecimal 转换为 Bigint 并且失败了,但是谁能告诉我如何将 bigint 转换为 spark 兼容的数据类型? 如果没有,我该如何修改我的逻辑以在 case 语句中为这个 bigint/bigdecimal 问题提供正确的数据类型?

【问题讨论】:

【参考方案1】:

通过看到您的问题,您似乎正在尝试将 bigint 值转换为大十进制,这是不对的。 Bigdecimal 是一个必须具有固定精度(最大位数)和小数位数(点右侧的位数)的小数。而你的似乎是长期价值。

这里不使用BigDecimal 数据类型,而是尝试使用LongType 来正确转换bigint 值。看看这是否能解决您的目的。

【讨论】:

我在问题中犯了一个错误。我已经在代码中有“bigint”=> LongType。但我正在尝试使用 BigDecimal 并再次粘贴相同的内容。因为 BigDecimal 给出了编译错误并且不能用它运行代码。我现在已经编辑了问题。

以上是关于Spark 中 bigint 的兼容数据类型是啥?我们如何将 bigint 转换为 spark 兼容的数据类型?的主要内容,如果未能解决你的问题,请参考以下文章

将包含 BigInt 的 RDD 转换为 Spark Dataframe

操作提示:bigint 与 time 不兼容

mysql bigint(20) 20指的是啥意思

Spark:将 bytearray 转换为 bigint

从另一个数据库导入 Spark 2 的对象的类型是啥? [复制]

在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?