根据条件在 Spark SQL 或 MySQL 中生成新列
Posted
技术标签:
【中文标题】根据条件在 Spark SQL 或 MySQL 中生成新列【英文标题】:Generating a new column in Spark SQL or MySQL based on a condition 【发布时间】:2020-04-18 08:09:47 【问题描述】:表创建:
CREATE TABLE temp (
name varchar(20),
dep varchar(20));
INSERT INTO temp VALUES
('a', null),
('b', null),
('c', 'b'),
('d', 'c'),
('e', 'b'),
('e', 'd');
我需要社区的帮助来编写一个生成新列的查询,比如 xyz,如果 dep 为空,则该列的值为 1。否则它必须为对应的 name 取 dep 并将 xyz 列值加 1。
例如:这里c是依赖于b的,所以要取bxyz /strong> 具有 1 并在其上加 1,从而使 c 的 xyz 值为 2,依此类推。
输出:
+------+------+-----+
| name | dep | xyz |
+------+------+-----+
| a | null | 1 |
| b | null | 1 |
| c | b | 2 |
| d | c | 3 |
| e | b | 2 |
| e | d | 4 |
+------+------+-----+
表创建:
create table temp1(name varchar(20), dependency varchar(20));
insert into temp1 values
('city', null), ('state', null), ('country', 'city'),
('country','state'), ('pin','country'), ('pin','state'),
('continent','country'), ('continent','pin'), ('continent','city');
预期输出:这里的序列是要生成的新列。
| name | dependency | sequence |
|----------|------------|----------|
| city | null | 1 |
| state | null | 1 |
| country | city | 2 |
| country | state | 2 |
| pin | country | 3 |
| pin | state | 2 |
| continent| country | 3 |
| continent| pin | 4 |
| continent| city | 2 |
我向社区提出的第一个问题 :) 提前感谢你们。
【问题讨论】:
是的。从一个艰难的开始!您想在哪个数据库和版本中执行此操作? 您需要pyspark̀
解决方案吗?因为有标签,但您在问题中没有提及任何有关 spark 的内容
你需要一个递归查询,mysql 8.0 有。 dev.mysql.com/doc/refman/8.0/en/with.html
我需要 spark SQL 或 pyspark 或 MySQL。我用另一个例子更新了这个问题。帮帮我。
根据您的编辑,答案已更新
【参考方案1】:
试试这个,看看Demo这里My SQL 8.0
select
name,
dep,
dense_rank() over (order by dep) as xyz
from myTable
order by
name, dep
输出:
+--------------+
name dep xyz
+--------------+
a (null) 1
b (null) 1
c b 2
d c 3
e b 2
e d 4
对于第二个问题,您可以通过简单的case
语句来实现,如下所示
select
name,
type,
case
when
(name = 'country' and type = 'city')
OR (name = 'continent' and type = 'city')
OR (name = 'pin' and type = 'state')
OR (name = 'country' and type = 'state')
then
2
when
(name = 'pin' and type = 'country')
OR (name = 'continent' and type = 'country')
then
3
when
(name = 'continent' and type = 'pin')
then
4
else
1
end as ranks
from myTable
输出:
+--------------------------+
name type ranks
+--------------------------+
city null 1
state null 1
country city 2
country state 2
pin country 3
pin state 2
continent country 3
continent pin 4
continent city 2
【讨论】:
感谢热心。但是我已经为您的查询未按预期工作的问题添加了一个新表。你能帮忙吗? @zealous——查询本质上必须是动态的。如果有 100 条记录。使用 case 语句查询将持续多长时间 :(【参考方案2】:@zealous 的回答似乎可以解决问题。
我可以建议您使用等效的 pyspark
语法(因为 pyspark
在标签中)
首先,创建你的数据框
import pyspark.sql.window as psw
import pyspark.sql.functions as psf
df = spark.createDataFrame([("a",None , 1), ("b", None ,1),
("c","b",2), ("d","c",3),
("e","b",2),("e","d",4)],
['name','dep','xyz'])
df.show(5)
+----+----+---+
|name| dep|xyz|
+----+----+---+
| a|null| 1|
| b|null| 1|
| c| b| 2|
| d| c| 3|
| e| b| 2|
+----+----+---+
only showing top 5 rows
这个想法是按dep
排序:Null
值是第一个,然后你得到字母的排序。使用psf.dense_rank
,您的订购不会有空白。要在Spark
中应用dense_rank
,您需要一个Window
函数:
w = psw.Window.orderBy('dep')
df.withColumn("xyz", psf.dense_rank().over(w))
df.show(5)
+----+----+---+
|name| dep|xyz|
+----+----+---+
| a|null| 1|
| b|null| 1|
| c| b| 2|
| d| c| 3|
| e| b| 2|
+----+----+---+
only showing top 5 rows
更新
对于您的第二个问题,我没有看到任何可以提供优雅解决方案的模式。您将需要一系列psf.when
语句。
import pyspark.sql.functions as psf
df = spark.createDataFrame([('city', None),('state', None),
('country', 'city'),('country','state'),
('pin','country'),('pin','state'),
('continent','country'),('continent','pin'),
('continent','city')], ['name','type'])
df = df.withColumn("sequence", psf.when(
((psf.col('name') == "country" ) & (psf.col('type') == "city")) |
((psf.col('name') == "continent") & (psf.col('type') == "city")) |
((psf.col('name') == "pin") & (psf.col('type') == "state")) |
((psf.col('name') == "country") & (psf.col('type') == "state")),
2
).when(
((psf.col('name') == "pin") & (psf.col('type') == "country")) |
((psf.col('name') == "continent") & (psf.col('type') == "country"))
,
3
).when(
(psf.col('name') == "continent") & (psf.col('type') == "pin"),
4
).otherwise(1)
)
df.show(10)
+---------+-------+--------+
| name| type|sequence|
+---------+-------+--------+
| city| null| 1|
| state| null| 1|
| country| city| 2|
| country| state| 2|
| pin|country| 3|
| pin| state| 2|
|continent|country| 3|
|continent| pin| 4|
|continent| city| 2|
+---------+-------+--------+
避免psf.when
由于您的条件没有明显的模式,除了使用我现在建议的链式 when
或 join
之外,我看不到其他方法
这个想法是在你的两列上创建一个数据框,然后合并它。解决方案未经测试。
conditions = spark.createDataFrame([('country', 'city',2),('continent','city',2),
('pin','state',2),('country','state',2),
('pin','country',3),('continent','country',3),
('continent','pin', 4)],
['name','type','sequence'])
df = df.join(psf.broadcast(conditions),
['name', 'type'], 'left_outer')
.fillna(1, subset=['sequence'])
顺便说一句,我使用psf.broadcast
来加速合并,因为conditions
DataFrame
的大小应该是合理的。
如果你有大量的条件,我认为应该首选这种方法。它将使您的代码更具可读性
【讨论】:
@linog-:我已经更新了我的问题。你能帮我吗?谢谢你 答案已更新。除了使用@zealous 建议的一系列when
语句之外,没有优雅的方法可以做到这一点
谢谢你。但是查询会随着记录数的增加而不断增加。请帮助我使用 python 或 pyspark 循环类型的解决方案。我认为这有帮助。
我更新了。我不知道如何使用循环来做到这一点,但我建议您使用基于join
的解决方案。如果答案让你满意,你可以接受它以上是关于根据条件在 Spark SQL 或 MySQL 中生成新列的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据