Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数

Posted

技术标签:

【中文标题】Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数【英文标题】:Spark 1.5.0 (PySpark) Case When Logic & Lag Window Function 【发布时间】:2016-01-25 23:15:23 【问题描述】:

我正在尝试在 HiveContext 中使用 Spark SQL 函数“WHEN / OTHERWISE”以及窗口中的 LAG,为一些连续分钟数据中的升序数字计数字段创建 DIFF 字段,该字段经常重置为零.所以我需要更正“计数”重置为零。

所以我的代码如下:

window = Window.partitionBy("car","trip_id").orderBy("car","datetime")
df = df.withColumn('new_count', F.when(df.num_count >= F.lag(df.num_count),(df.num_count- F.lag(df.num_count))).otherwise(df.num_count.astype('long')).over(window))

我在 Pyspark 中的错误是:

: java.lang.UnsupportedOperationException: CASE WHEN ...<"variable names">...  is not supported in window operation

尝试使用"sqlContext.sql("Select CASE WHEN...lag(num_count) OVER...")会更好吗?

【问题讨论】:

我有点担心 SQLContext 不支持延迟。你必须使用 HiveContext。 【参考方案1】:

Window 应该单独应用于每个窗口函数。在这种特殊情况下,您可以像这样简单地提取表达式:

num_count_lag = F.lag(df.num_count).over(window)

df.withColumn(
    'new_count',
    F.when(
        df.num_count >= num_count_lag,
        df.num_count - num_count_lag
    ).otherwise(df.num_count.astype('long'))
)

【讨论】:

以上是关于Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

必须使用 Hive 构建 Spark (spark 1.5.0)

Python应用实战案例-pyspark库从安装到实战保姆级讲解

Pandas 到 pyspark cumprod 函数

pyspark案例系列10-java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

spark pyspark mllib 模型 - 当使用 map 生成预测 rdd 时,它会在 collect() 上引发异常

当 python 函数比它们快时,为啥我们使用 pyspark UDF? (注。不用担心 spark SQL 命令)