如何将 DataFrame(包括数组)中的两列与 CSV(Dataframe/Dictionary)中的两列匹配
Posted
技术标签:
【中文标题】如何将 DataFrame(包括数组)中的两列与 CSV(Dataframe/Dictionary)中的两列匹配【英文标题】:How to match two columns from a DataFrame (including Arrays) with two columns from a CSV (Dataframe/Dictionary) 【发布时间】:2020-09-08 06:25:35 【问题描述】:我有一个这样的DataFrame;
df = spark.createDataFrame([
[["Apple"],['iPhone EE','iPhone 11', 'iPhone 11 Pro']],
[["Acer"],['Iconia Talk S','liquid Z6 Plus']],
[["Casio"],['Casio G\'zOne Brigade']],
[["Alcatel"[,[]],
[["HTC", "Honor"].["Play 4", "Play 7"]]
]).toDF("brand","type")
还有这样的 csv;
Apple;iPhone EE
Apple;iPhone 11 Pro
Apple;iPhone XS
Acer;liquid Z6 Plus
Acer;Acer Predator 8
Casio;Casio G'zOne Ravine
Alcatel;3L
HTC;Play 4
Honor;Play 7
我需要创建一个新的布尔列match
。
如果brand
和 type
的组合与CSV 中的某一行匹配,则为True
,否则为False
。
预期输出:
Brand | Type | Match
-------------------------------------------------------------
Apple | [iPhone EE, iPhone 11, iPhone 11 Pro] | True
Acer | [Iconia Talk S, liquid Z6 Plus] | True
Casio | [Casio G\'zOne Brigade] | False
Alcatel | [] | False
HTC, Honor | [Play 4, Play 7] | True
更新brand
也是array<string>
类型
csv 文件只是一个开始。它可以转换为 Dataframe 或 Dictionary(或任何最适合的)。
我怎样才能最好地做到这一点?
【问题讨论】:
【参考方案1】:您可以尝试size + array_intersect 设置此标志。
from pyspark.sql.functions import collect_set, size, array_intersect, broadcast, expr, flatten, collect_list, array_join
df_list = spark.read.csv("/path/to/csv_list", sep=';').toDF('brand_name','type')
df1 = df_list.groupby('brand_name').agg(collect_set('type').alias('types'))
df_new = df.join(broadcast(df1), expr("array_contains(brand, brand_name)"), "left") \
.groupby('brand', 'Type') \
.agg(flatten(collect_list('types')).alias('types')) \
.select(array_join('brand', ', ').alias('brand'), 'Type', (size(array_intersect('type', 'types'))>0).alias("Match"))
df_new.show(5,0)
+----------+-------------------------------------+-----+
|brand |Type |Match|
+----------+-------------------------------------+-----+
|Alcatel |[] |false|
|HTC, Honor|[Play 4, Play 7] |true |
|Casio |[Casio G'zOne Brigade] |false|
|Acer |[Iconia Talk S, liquid Z6 Plus] |true |
|Apple |[iPhone EE, iPhone 11, iPhone 11 Pro]|true |
+----------+-------------------------------------+-----+
方法2:使用地图(map<string,array<string>>
):
from pyspark.sql.functions import arrays_overlap, array, lit, col, create_map, col, monotonically_increasing_id, first, explode, array_join
dict1 = df1.rdd.collectAsMap()
map1 = create_map([ t for k,v in dict1.items() for t in [lit(k), array(*map(lit,v))] ])
#Column<b"map(Casio, array(Casio G'zOne Ravine), Alcatel, array(3L), Acer, array(Acer Predator 8, liquid Z6 Plus), HTC, array(Play 4), Honor, array(Play 7), Apple, array(iPhone EE, iPhone 11 Pro, iPhone XS))">
df_new = df.withColumn('id', monotonically_increasing_id()) \
.withColumn('brand', explode('brand')) \
.withColumn('Match', arrays_overlap('type', map1[col('brand')])) \
.groupby('id') \
.agg(
array_join(collect_set('brand'),', ').alias('brand'),
first('Type').alias('Type'),
expr("sum(int(Match)) > 0 as Match")
)
df_new.show(5,0)
+---+----------+-------------------------------------+-----+
|id |brand |Type |Match|
+---+----------+-------------------------------------+-----+
|0 |Apple |[iPhone EE, iPhone 11, iPhone 11 Pro]|true |
|1 |Acer |[Iconia Talk S, liquid Z6 Plus] |true |
|3 |Alcatel |[] |false|
|2 |Casio |[Casio G'zOne Brigade] |false|
|4 |HTC, Honor|[Play 4, Play 7] |true |
+---+----------+-------------------------------------+-----+
【讨论】:
我必须道歉。 Brand 也是数组df_new = df.join(broadcast(df1), expr("array_contains(brand, brand_name)"), "left")
的错误cannot resolve 'array_contains (....) due to data type mismatch: Input to function array_contains should have been array followed by a value with same element type, but it's [array<string>, array<string>]
看起来它们都是同一类型array<string>
。我还检查了printSchema()
,它们都是array<string>
类型。
你是如何设置 df1 的?如果您按照我帖子中的步骤进行操作,我们会从 csv 文件中读取 df_list,在 groupby 之后,brand_name
应该是 StringType?除非您有其他一些后处理步骤?
更改顺序修复了它...duh expr("array_contains(brand, brand_name)")
,我使用了expr("array_contains(brand_name, brand)")
【参考方案2】:
这可能有用。
>>> import pyspark.sql.functions as F
>>> df = spark.createDataFrame([
... ["Apple",['iPhone EE','iPhone 11', 'iPhone 11 Pro']],
... ["Acer",['Iconia Talk S','liquid Z6 Plus']],
... ["Casio",['Casio G\'zOne Brigade']],
... ["Alcatel",[]]
... ]).toDF("brand","type")
>>> df.show(df.count(), False)
+-------+-------------------------------------+
|brand |type |
+-------+-------------------------------------+
|Apple |[iPhone EE, iPhone 11, iPhone 11 Pro]|
|Acer |[Iconia Talk S, liquid Z6 Plus] |
|Casio |[Casio G'zOne Brigade] |
|Alcatel|[] |
+-------+-------------------------------------+
>>> file_df = sqlcontext.read.csv('/home/chai/brand.csv', header='true')
>>> file_df.show(file_df.count(), False)
+-------+-------------------+
|brand |types |
+-------+-------------------+
|Apple |iPhone EE |
|Apple |iPhone 11 Pro |
|Apple |iPhone XS |
|Acer |liquid Z6 Plus |
|Acer |Acer Predator 8 |
|Casio |Casio G'zOne Ravine|
|Alcatel|3L |
+-------+-------------------+
>>> file_df = file_df.groupBy('brand').agg(F.collect_list('types').alias('new'))
>>> file_df.show(file_df.count(), False)
+-------+-------------------------------------+
|brand |new |
+-------+-------------------------------------+
|Casio |[Casio G'zOne Ravine] |
|Alcatel|[3L] |
|Acer |[liquid Z6 Plus, Acer Predator 8] |
|Apple |[iPhone EE, iPhone 11 Pro, iPhone XS]|
+-------+-------------------------------------+
>>> def test(row_dict):
... new_dict = dict()
... for i in row_dict.get('type'):
... if i in row_dict.get('new'):
... new_dict['flag'] = 'True'
... else:
... new_dict['flag'] = 'False'
... if len(row_dict.get('type')) == 0 and len(row_dict.get('new')) > 0:
... new_dict['flag'] = 'False'
... new_dict['brand'] = row_dict.get('brand')
... new_dict['type'] = row_dict.get('type')
... new_dict['new'] = row_dict.get('new')
... return new_dict
...
>>> def row_to_dict(row):
... return row.asDict(recursive=True)
>>> rdd = all.rdd.map(row_to_dict)
>>> rdd1 = rdd.map(test)
>>> final_df = sqlcontext.createDataFrame(rdd1)
>>> final_df.show(final_df.count(), False)
+-------+-----+-------------------------------------+-------------------------------------+
|brand |flag |new |type |
+-------+-----+-------------------------------------+-------------------------------------+
|Apple |True |[iPhone EE, iPhone 11 Pro, iPhone XS]|[iPhone EE, iPhone 11, iPhone 11 Pro]|
|Acer |True |[liquid Z6 Plus, Acer Predator 8] |[Iconia Talk S, liquid Z6 Plus] |
|Casio |False|[Casio G'zOne Ravine] |[Casio G'zOne Brigade] |
|Alcatel|False|[3L] |[] |
+-------+-----+-------------------------------------+-------------------------------------+
【讨论】:
以上是关于如何将 DataFrame(包括数组)中的两列与 CSV(Dataframe/Dictionary)中的两列匹配的主要内容,如果未能解决你的问题,请参考以下文章
Python:从 DataFrame 中的两列创建结构化 numpy 结构化数组
pandas使用dataframe中的两列时间对象数据列作差生成时间差数据列将时间差(timedelta对象)与特定时间长度进行比较