数据框用唯一的纪元时间替换每一行空值
Posted
技术标签:
【中文标题】数据框用唯一的纪元时间替换每一行空值【英文标题】:Dataframe replace each row null values with unique epoch time 【发布时间】:2018-10-09 09:12:24 【问题描述】:我在数据框中有 3 行,在 2 行中,列 id 有空值。我需要遍历该特定列 id 上的每一行并替换为应该是唯一的并且应该在数据帧本身中发生的纪元时间。如何做呢? 例如:
id | name
1 a
null b
null c
我想要这个将 null 转换为纪元时间的数据帧。
id | name
1 a
1435232 b
1542344 c
【问题讨论】:
纪元时间是什么意思?您是指一个唯一的数字,还是对它的计算方式有一些要求? epoch 是唯一的 .. 或一些唯一的数字 Primary keys with Apache Spark的可能重复 【参考方案1】:df
.select(
when($"id").isNull, /*epoch time*/).otherwise($"id").alias("id"),
$"name"
)
编辑
您需要确保 UDF 足够精确 - 如果它只有毫秒分辨率,您将看到重复值。请参阅下面的示例,该示例清楚地说明了我的方法有效:
scala> def rand(s: String): Double = Math.random
rand: (s: String)Double
scala> val udfF = udf(rand(_: String))
udfF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))
scala> res11.select(when($"id".isNull, udfF($"id")).otherwise($"id").alias("id"), $"name").collect
res21: Array[org.apache.spark.sql.Row] = Array([0.6668195187088702,a], [0.920625293516218,b])
【讨论】:
如果你这样做,每一行的纪元时间如何? OP 没有指定如何生成它,但他们必须调用一些返回必要值的函数。 我需要每一行都是唯一的......每一行的时代不同 @TerryDactyl is not 是在 Spark SQL 中生成随机值的有效方法。 我改了。我最初选择 Math.random 来提供每次返回不同值的 udf。 System.nanoSeconds 可能是一样的,除非你有一台非常快的机器!【参考方案2】:看看这个
scala> val s1:Seq[(Option[Int],String)] = Seq( (Some(1),"a"), (null,"b"), (null,"c"))
s1: Seq[(Option[Int], String)] = List((Some(1),a), (null,b), (null,c))
scala> val df = s1.toDF("id","name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> val epoch = java.time.Instant.now.getEpochSecond
epoch: Long = 1539084285
scala> df.withColumn("id",when( $"id".isNull,epoch).otherwise($"id")).show
+----------+----+
| id|name|
+----------+----+
| 1| a|
|1539084285| b|
|1539084285| c|
+----------+----+
scala>
EDIT1:
我使用毫秒,然后我得到相同的值。 Spark 不会在时间部分捕获纳秒。许多行可能会获得相同的毫秒数。因此,您基于 epoch 获得唯一值的假设是行不通的。
scala> def getEpoch(x:String):Long = java.time.Instant.now.toEpochMilli
getEpoch: (x: String)Long
scala> val myudfepoch = udf( getEpoch(_:String):Long )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))
scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+-------------+----+
| id|name|
+-------------+----+
| 1| a|
|1539087300957| b|
|1539087300957| c|
+-------------+----+
scala>
唯一的可能是使用 monotonicallyIncreasingId,但这些值的长度可能并不总是相同。
scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)+monotonicallyIncreasingId).otherwise($"id")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+-------------+----+
| id|name|
+-------------+----+
| 1| a|
|1539090186541| b|
|1539090186543| c|
+-------------+----+
scala>
EDIT2:
我能够欺骗 System.nanoTime 并获得不断增加的 id,但它们不会是连续的,但可以保持长度。见下文
scala> def getEpoch(x:String):String = System.nanoTime.toString.take(12)
getEpoch: (x: String)String
scala> val myudfepoch = udf( getEpoch(_:String):String )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+------------+----+
| id|name|
+------------+----+
| 1| a|
|186127230392| b|
|186127230399| c|
+------------+----+
scala>
在集群中运行时试试这个并调整 take(12),如果你得到重复的值。
【讨论】:
@stack0114106 您的第二次编辑仍然不能保证唯一性。 是的,这种情况很少发生。尝试使用 System.nanoTime.toString.take(14) 或 (16)以上是关于数据框用唯一的纪元时间替换每一行空值的主要内容,如果未能解决你的问题,请参考以下文章