Spark Scala:在使用 spark 按不同日期排序后,需要获取具有 NULL 日期的记录
Posted
技术标签:
【中文标题】Spark Scala:在使用 spark 按不同日期排序后,需要获取具有 NULL 日期的记录【英文标题】:Spark Scala: Need to get records with NULL dates coming on top after ordering by a different date using spark 【发布时间】:2020-10-06 18:16:47 【问题描述】:我有以下数据:
+-----------+-----------+-----------+-----+-----------+
| Env1_date | Env2_date | Env3_date | Pid | orderDate |
+-----------+-----------+-----------+-----+-----------+
| Null | Null | 1/9/2020 | abc | 10/6/2020 |
| Null | 1/9/2020 | 1/8/2020 | pqr | 10/4/2020 |
| 1/9/2020 | Null | Null | xyz | 10/2/2020 |
| 1/8/2020 | 1/7/2020 | Null | uvw | 10/1/2020 |
+-----------+-----------+-----------+-----+-----------+
我正在尝试创建 3 个新列,它们基本上告诉 Pid
是否对 env1、env2 和 env3 有效。
为此,我首先按降序对orderDate
列上的记录进行排序(已在上表中排序)。
如果对于Env1_date
、Env2_date
、Env3_date
,排名靠前的记录是Null
,则认为它们是有效的。在Null
记录之后,如果日期小于特定日期(在此示例中为1/9/2020
),则认为其有效。任何其他记录都被标记为无效。
如果顶部记录不是NULL
,需要检查日期是否等于1/9/2020
。如果是这样,它们也被标记为有效
我的输出应该如下所示:
+-----------+-----------+-----------+-----+-----------+-----------+-----------+-----------+
| Env1_date | Env2_date | Env3_date | Pid | orderDate | Env1_Flag | Env2_Flag | Env3_Flag |
+-----------+-----------+-----------+-----+-----------+-----------+-----------+-----------+
| Null | Null | 1/9/2020 | abc | 10/6/2020 | Valid | Valid | Valid |
| Null | 1/9/2020 | 1/8/2020 | pqr | 10/4/2020 | Valid | Valid | Invalid |
| 1/9/2020 | Null | Null | xyz | 10/2/2020 | Valid | Invalid | Invalid |
| 1/8/2020 | 1/7/2020 | Null | uvw | 10/1/2020 | Invalid | Invalid | Invalid |
+-----------+-----------+-----------+-----+-----------+-----------+-----------+-----------+
我正在尝试使用Spark 1.5
和scala
来实现这一目标。
我尝试使用lag
函数。但无法包括所有场景。
不知道如何解决这个问题。
谁能帮帮我。
注意:Windows 函数、toDf()、createDataFrame() 函数在我使用的spark
中不起作用。它是一个自定义 Spark 环境,几乎没有限制
【问题讨论】:
嗨@Vaishak,您是否已经使用数据创建了数据框?你能告诉我们一些代码或者你如何获取数据吗?来源是什么? 嗨@Chema,数据很容易以镶木地板文件的形式提供。我只是将文件作为数据框读取。 【参考方案1】:import spark.implicits._
case class Source(
Env1_date: Option[String],
Env2_date: Option[String],
Env3_date: Option[String],
Pid: String,
orderDate: String
)
case class Source1(
Env1_date: Option[String],
Env2_date: Option[String],
Env3_date: Option[String],
Pid: String,
orderDate: String,
Env1_Flag: String,
Env2_Flag: String,
Env3_Flag: String
)
val source = Seq(
Source(None, None, Some("1/9/2020"), "abc", "10/6/2020"),
Source(None, Some("1/9/2020"), Some("1/8/2020"), "pqr", "10/4/2020"),
Source(Some("1/9/2020"), None, None, "xyz", "10/2/2020"),
Source(Some("1/8/2020"), Some("1/7/2020"), None, "abc", "10/6/2020")
).toDF().as[Source].collect()
var env1NextRowInvalid = false
var env2NextRowInvalid = false
var env3NextRowInvalid = false
val source1 = source.map(i =>
val env1Flag = if (env1NextRowInvalid == false && (i.Env1_date.getOrElse("") == """1/9/2020""" || i.Env1_date.getOrElse("") == "")) "valid" else "invalid"
env1NextRowInvalid = if(env1NextRowInvalid == false) (i.Env1_date == "1/9/2020") else true
val env2Flag = if (env2NextRowInvalid == false && (i.Env2_date.getOrElse("") == """1/9/2020""" || i.Env2_date.getOrElse("") == "")) "valid" else "invalid"
env2NextRowInvalid = if(env2NextRowInvalid == false) (i.Env2_date.getOrElse("") == "1/9/2020") else true
val env3Flag = if (env3NextRowInvalid == false && (i.Env3_date.getOrElse("") == """1/9/2020""" || i.Env3_date.getOrElse("") == "")) "valid" else "invalid"
env3NextRowInvalid = if(env3NextRowInvalid == false) (i.Env3_date.getOrElse("") == "1/9/2020") else true
Source1(i.Env1_date, i.Env2_date, i.Env3_date, i.Pid, i.orderDate, env1Flag, env2Flag, env3Flag)
)
val resDF = source1.toSeq.toDF()
resDF.show(false)
// +---------+---------+---------+---+---------+---------+---------+---------+
// |Env1_date|Env2_date|Env3_date|Pid|orderDate|Env1_Flag|Env2_Flag|Env3_Flag|
// +---------+---------+---------+---+---------+---------+---------+---------+
// |null |null |1/9/2020 |abc|10/6/2020|valid |valid |valid |
// |null |1/9/2020 |1/8/2020 |pqr|10/4/2020|valid |valid |invalid |
// |1/9/2020 |null |null |xyz|10/2/2020|valid |invalid |invalid |
// |1/8/2020 |1/7/2020 |null |abc|10/6/2020|invalid |invalid |invalid |
// +---------+---------+---------+---+---------+---------+---------+---------+
【讨论】:
【参考方案2】:您可以做到这一点的一种方法是将所有数据收集到驱动程序并将其作为常规数组处理,然后再次将其转换为 DF。但请注意,数据应该适合驱动程序。
我编写的代码可以处理您提供的数据。如果你稍微调整一下(尤其是数据比较部分),你应该会得到你所期望的。
// This is how your data is going to look like when you collect it with df.collect
val arrayData = Array(
Array("null", "null", "1/9/2020", "abc", "10/6/2020"),
Array("null", "1/9/2020", "1/8/2020", "pqr", "10/4/2020"),
Array("1/9/2020", "null", "null", "xyz", "10/2/2020"),
Array("1/8/2020", "1/7/2020", "null", "uvw", "10/1/2020"),
)
// just printing
arrayData.foreach(arr => println(arr.mkString(" \t| ")))
println("-".repeat(30))
// rotates the array, so column become rows and vice verse
def shiftArray(arr: Array[Array[String]])
= for(i <- arr(0).indices.toArray) yield arr.map(arr => arr(i))
// the function that does the validation part
val someDate = "1/9/2020"
def processColumn(arr: Array[String]) =
val (startingNulls, rest) = arr.span(_ == "null")
val startingNullsValidated: Array[String] = startingNulls.map(_ => "Valid")
val restValidated: Array[String] = rest.map(date => if (date == someDate) "Valid" else "Invalid") // implement custom date comparison
startingNullsValidated ++ restValidated
val shiftedArray: Array[Array[String]] = shiftArray(arrayData)
// you need to validate only first 3 columns, so i used take/slice
val validatedArray =
val columnsToProcess = shiftedArray.take(3)
val otherColumns = shiftedArray.slice(3, shiftedArray.length)
val processedColumns = for (arr <- columnsToProcess) yield processColumn(arr)
processedColumns ++ otherColumns
// rotate array back
val shiftBackValidatedArray = shiftArray(validatedArray)
// just printing the final result
shiftBackValidatedArray.foreach(arr => println(arr.mkString(" \t| ")))
这是上面打印行的输出
null | null | 1/9/2020 | abc | 10/6/2020
null | 1/9/2020 | 1/8/2020 | pqr | 10/4/2020
1/9/2020 | null | null | xyz | 10/2/2020
1/8/2020 | 1/7/2020 | null | uvw | 10/1/2020
------------------------------
Valid | Valid | Valid | abc | 10/6/2020
Valid | Valid | Invalid | pqr | 10/4/2020
Valid | Invalid | Invalid | xyz | 10/2/2020
Invalid | Invalid | Invalid | uvw | 10/1/2020
【讨论】:
那是另一个问题。我将无法在这里将数组转换为 df。 toDf(),createDataframe 函数在我的情况下不起作用。(我们无权访问 sparkContect。自定义框架处理它)以上是关于Spark Scala:在使用 spark 按不同日期排序后,需要获取具有 NULL 日期的记录的主要内容,如果未能解决你的问题,请参考以下文章
通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧