根据条件在 Spark SQL 或 MySQL 中生成新列

Posted

技术标签:

【中文标题】根据条件在 Spark SQL 或 MySQL 中生成新列【英文标题】:Generating a new column in Spark SQL or MySQL based on a condition 【发布时间】:2020-04-18 08:09:47 【问题描述】:

表创建:

CREATE TABLE temp (
name varchar(20), 
dep varchar(20));

INSERT INTO temp VALUES 
('a', null), 
('b', null), 
('c', 'b'), 
('d', 'c'), 
('e', 'b'), 
('e', 'd');

我需要社区的帮助来编写一个生成新列的查询,比如 xyz,如果 dep 为空,则该列的值为 1。否则它必须为对应的 namedep 并将 xyz 列值加 1。

例如:这里c是依赖于b的,所以要取bxyz /strong> 具有 1 并在其上加 1,从而使 cxyz 值为 2,依此类推。

输出:

+------+------+-----+
| name |  dep | xyz |
+------+------+-----+
|  a   | null |  1  |
|  b   | null |  1  |
|  c   |  b   |  2  |
|  d   |  c   |  3  |
|  e   |  b   |  2  |
|  e   |  d   |  4  |
+------+------+-----+

表创建:

create table temp1(name varchar(20), dependency varchar(20));
insert into temp1 values
    ('city', null), ('state', null), ('country', 'city'),
    ('country','state'), ('pin','country'), ('pin','state'),
    ('continent','country'), ('continent','pin'), ('continent','city');

预期输出:这里的序列是要生成的新列。

| name     | dependency | sequence |
|----------|------------|----------|
| city     | null       | 1        |
| state    | null       | 1        |
| country  | city       | 2        |
| country  | state      | 2        |
| pin      | country    | 3        |
| pin      | state      | 2        |
| continent| country    | 3        |
| continent| pin        | 4        |
| continent| city       | 2        |

我向社区提出的第一个问题 :) 提前感谢你们。

【问题讨论】:

是的。从一个艰难的开始!您想在哪个数据库和版本中执行此操作? 您需要pyspark̀ 解决方案吗?因为有标签,但您在问题中没有提及任何有关 spark 的内容 你需要一个递归查询,mysql 8.0 有。 dev.mysql.com/doc/refman/8.0/en/with.html 我需要 spark SQL 或 pyspark 或 MySQL。我用另一个例子更新了这个问题。帮帮我。 根据您的编辑,答案已更新 【参考方案1】:

试试这个,看看Demo这里My SQL 8.0

select
  name,
  dep,
  dense_rank() over (order by dep) as xyz
from myTable
order by
  name, dep

输出:

+--------------+
name  dep   xyz
+--------------+
 a  (null)   1
 b  (null)   1
 c    b      2
 d    c      3
 e    b      2
 e    d      4

对于第二个问题,您可以通过简单的case 语句来实现,如下所示

select
    name,
    type,
    case
    when 
        (name = 'country' and type = 'city') 
        OR (name = 'continent' and type = 'city')
        OR (name = 'pin' and type = 'state')
        OR (name = 'country' and type = 'state') 
    then
        2
    when
        (name = 'pin' and type = 'country')
        OR (name = 'continent' and type = 'country')
    then
        3
    when
        (name = 'continent' and type = 'pin')
    then
        4
    else
        1
    end as ranks
from myTable

输出:

+--------------------------+
name        type      ranks
+--------------------------+
city        null        1
state       null        1
country     city        2
country     state       2
pin         country     3
pin         state       2
continent   country     3
continent   pin         4
continent   city        2

【讨论】:

感谢热心。但是我已经为您的查询未按预期工作的问题添加了一个新表。你能帮忙吗? @zealous——查询本质上必须是动态的。如果有 100 条记录。使用 case 语句查询将持续多长时间 :(【参考方案2】:

@zealous 的回答似乎可以解决问题。

我可以建议您使用等效的 pyspark 语法(因为 pyspark 在标签中)

首先,创建你的数据框

import pyspark.sql.window as psw
import pyspark.sql.functions as psf

​df = spark.createDataFrame([("a",None , 1), ("b", None ,1),
                            ("c","b",2), ("d","c",3),
                            ("e","b",2),("e","d",4)],
                           ['name','dep','xyz'])

df.show(5)

+----+----+---+
|name| dep|xyz|
+----+----+---+
|   a|null|  1|
|   b|null|  1|
|   c|   b|  2|
|   d|   c|  3|
|   e|   b|  2|
+----+----+---+
only showing top 5 rows

这个想法是按dep 排序:Null 值是第一个,然后你得到字母的排序。使用psf.dense_rank,您的订购不会有空白。要在Spark 中应用dense_rank,您需要一个Window 函数:

w = psw.Window.orderBy('dep')
df.withColumn("xyz", psf.dense_rank().over(w))
​
df.show(5)
+----+----+---+
|name| dep|xyz|
+----+----+---+
|   a|null|  1|
|   b|null|  1|
|   c|   b|  2|
|   d|   c|  3|
|   e|   b|  2|
+----+----+---+
only showing top 5 rows

更新

对于您的第二个问题,我没有看到任何可以提供优雅解决方案的模式。您将需要一系列psf.when 语句。

import pyspark.sql.functions as psf

df = spark.createDataFrame([('city', None),('state', None),
                            ('country', 'city'),('country','state'),
                            ('pin','country'),('pin','state'),
                            ('continent','country'),('continent','pin'),
                            ('continent','city')], ['name','type'])

df = df.withColumn("sequence", psf.when(
        ((psf.col('name') == "country" ) & (psf.col('type') == "city")) |
        ((psf.col('name') == "continent") & (psf.col('type') == "city")) |
        ((psf.col('name') == "pin") & (psf.col('type') == "state")) |
        ((psf.col('name') == "country") & (psf.col('type') == "state")),
        2
    ).when(
        ((psf.col('name') == "pin") & (psf.col('type') == "country")) |
        ((psf.col('name') == "continent") & (psf.col('type') == "country"))
  ,
        3
    ).when(
        (psf.col('name') == "continent") & (psf.col('type') == "pin"),
        4        
    ).otherwise(1)
    )

df.show(10)

+---------+-------+--------+
|     name|   type|sequence|
+---------+-------+--------+
|     city|   null|       1|
|    state|   null|       1|
|  country|   city|       2|
|  country|  state|       2|
|      pin|country|       3|
|      pin|  state|       2|
|continent|country|       3|
|continent|    pin|       4|
|continent|   city|       2|
+---------+-------+--------+

避免psf.when

由于您的条件没有明显的模式,除了使用我现在建议的链式 whenjoin 之外,我看不到其他方法

这个想法是在你的两列上创建一个数据框,然后合并它。解决方案未经测试。

conditions = spark.createDataFrame([('country', 'city',2),('continent','city',2),
                            ('pin','state',2),('country','state',2),
                            ('pin','country',3),('continent','country',3),
                            ('continent','pin', 4)],
['name','type','sequence'])

df = df.join(psf.broadcast(conditions),
             ['name', 'type'], 'left_outer')
       .fillna(1, subset=['sequence'])

顺便说一句,我使用psf.broadcast 来加速合并,因为conditions DataFrame 的大小应该是合理的。

如果你有大量的条件,我认为应该首选这种方法。它将使您的代码更具可读性

【讨论】:

@linog-:我已经更新了我的问题。你能帮我吗?谢谢你 答案已更新。除了使用@zealous 建议的一系列when 语句之外,没有优雅的方法可以做到这一点 谢谢你。但是查询会随着记录数的增加而不断增加。请帮助我使用 python 或 pyspark 循环类型的解决方案。我认为这有帮助。 我更新了。我不知道如何使用循环来做到这一点,但我建议您使用基于join 的解决方案。如果答案让你满意,你可以接受它

以上是关于根据条件在 Spark SQL 或 MySQL 中生成新列的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据

spark怎么实现将mysql表中按照字段的优先级关联起来

如何在spark sql lag函数中添加if或case条件

mysql根据条件执行sql

如何在Spark中使用AND或OR条件

如何在字典中使用 pyspark.sql.functions.when() 的多个条件?