将转换从 hive sql 查询转移到 Spark

Posted

技术标签:

【中文标题】将转换从 hive sql 查询转移到 Spark【英文标题】:moving transformations from hive sql query to Spark 【发布时间】:2016-08-22 09:20:20 【问题描述】:
val temp = sqlContext.sql(s"SELECT A, B, C, (CASE WHEN (D) in (1,2,3) THEN ((E)+0.000)/60 ELSE 0 END) AS Z from TEST.TEST_TABLE")
val temp1 = temp.map( temp => ((temp.getShort(0), temp.getString(1)), (USAGE_TEMP.getDouble(2), USAGE_TEMP.getDouble(3))))
.reduceByKey((x, y) => ((x._1+y._1),(x._2+y._2)))

我希望在 scala 中完成转换,而不是上面在 hive 层上进行计算(案例评估)的代码。我该怎么做?

在 Map 中填充数据时是否可以这样做?

【问题讨论】:

withColumnmethod 是除了下面 sarvesh 建议的 map 方法之外的另一种方法 【参考方案1】:
val temp = sqlContext.sql(s"SELECT A, B, C, D, E from TEST.TEST_TABLE")

val tempTransform = temp.map(row => 
  val z = List[Double](1, 2, 3).contains(row.getDouble(3)) match 
    case true => row.getDouble(4) / 60
    case _ => 0
  
  Row(row.getShort(0), Row.getString(1), Row.getDouble(2), z)
)

val temp1 = tempTransform.map( temp => ((temp.getShort(0), temp.getString(1)), (USAGE_TEMP.getDouble(2), USAGE_TEMP.getDouble(3))))
  .reduceByKey((x, y) => ((x._1+y._1),(x._2+y._2)))

【讨论】:

【参考方案2】:

你也可以使用这个语法

new_df = old_df.withColumn('target_column', udf(df.name))

正如example所指的那样

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
    .toDF("A", "B", "C")

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

在您的情况下,执行如下数据框的 sql val temp = sqlContext.sql(s"SELECT A, B, C, D, E from TEST.TEST_TABLE")

并应用withColumnwhen otherwise 或如果需要火花udf

,调用scala函数逻辑而不是hiveudf

【讨论】:

以上是关于将转换从 hive sql 查询转移到 Spark的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark Scala 将 Sql Server 数据类型转换为 Hive 数据类型

Spark SQL简介

从 spark sql 插入配置单元表

指定列的 Spark sql 问题

实例化“org.apache.spark.sql.hive.HiveExternalCatalog”时出错

如何将 BigQuery SQL 查询结果转换为 Spark DataFrame?