如何将 DataFrame(包括数组)中的两列与 CSV(Dataframe/Dictionary)中的两列匹配

Posted

技术标签:

【中文标题】如何将 DataFrame(包括数组)中的两列与 CSV(Dataframe/Dictionary)中的两列匹配【英文标题】:How to match two columns from a DataFrame (including Arrays) with two columns from a CSV (Dataframe/Dictionary) 【发布时间】:2020-09-08 06:25:35 【问题描述】:

我有一个这样的DataFrame;

df = spark.createDataFrame([
  [["Apple"],['iPhone EE','iPhone 11', 'iPhone 11 Pro']],   
  [["Acer"],['Iconia Talk S','liquid Z6 Plus']],   
  [["Casio"],['Casio G\'zOne Brigade']],
  [["Alcatel"[,[]],
  [["HTC", "Honor"].["Play 4", "Play 7"]]
]).toDF("brand","type")

还有这样的 csv;

Apple;iPhone EE
Apple;iPhone 11 Pro
Apple;iPhone XS
Acer;liquid Z6 Plus
Acer;Acer Predator 8
Casio;Casio G'zOne Ravine
Alcatel;3L
HTC;Play 4
Honor;Play 7

我需要创建一个新的布尔列match。 如果brand type 的组合与CSV 中的某一行匹配,则为True,否则为False

预期输出:

    Brand      | Type                                  | Match
    -------------------------------------------------------------
    Apple      | [iPhone EE, iPhone 11, iPhone 11 Pro] | True
    Acer       | [Iconia Talk S, liquid Z6 Plus]       | True
    Casio      | [Casio G\'zOne Brigade]               | False
    Alcatel    | []                                    | False
    HTC, Honor | [Play 4, Play 7]                      | True

更新brand 也是array<string> 类型

csv 文件只是一个开始。它可以转换为 Dataframe 或 Dictionary(或任何最适合的)。

我怎样才能最好地做到这一点?

【问题讨论】:

【参考方案1】:

您可以尝试size + array_intersect 设置此标志。

from pyspark.sql.functions import collect_set, size, array_intersect, broadcast, expr, flatten, collect_list, array_join

df_list = spark.read.csv("/path/to/csv_list", sep=';').toDF('brand_name','type')

df1 = df_list.groupby('brand_name').agg(collect_set('type').alias('types'))
  
df_new = df.join(broadcast(df1), expr("array_contains(brand, brand_name)"), "left") \
    .groupby('brand', 'Type') \
    .agg(flatten(collect_list('types')).alias('types')) \
    .select(array_join('brand', ', ').alias('brand'), 'Type', (size(array_intersect('type', 'types'))>0).alias("Match"))

df_new.show(5,0)
+----------+-------------------------------------+-----+                        
|brand     |Type                                 |Match|
+----------+-------------------------------------+-----+
|Alcatel   |[]                                   |false|
|HTC, Honor|[Play 4, Play 7]                     |true |
|Casio     |[Casio G'zOne Brigade]               |false|
|Acer      |[Iconia Talk S, liquid Z6 Plus]      |true |
|Apple     |[iPhone EE, iPhone 11, iPhone 11 Pro]|true |
+----------+-------------------------------------+-----+

方法2:使用地图(map<string,array<string>>):

from pyspark.sql.functions import arrays_overlap, array, lit, col, create_map, col, monotonically_increasing_id, first, explode, array_join

dict1 = df1.rdd.collectAsMap()

map1 = create_map([ t for k,v in dict1.items() for t in [lit(k), array(*map(lit,v))] ])
#Column<b"map(Casio, array(Casio G'zOne Ravine), Alcatel, array(3L), Acer, array(Acer Predator 8, liquid Z6 Plus), HTC, array(Play 4), Honor, array(Play 7), Apple, array(iPhone EE, iPhone 11 Pro, iPhone XS))">

df_new = df.withColumn('id', monotonically_increasing_id()) \
  .withColumn('brand', explode('brand')) \
  .withColumn('Match', arrays_overlap('type', map1[col('brand')])) \
  .groupby('id') \
  .agg(
    array_join(collect_set('brand'),', ').alias('brand'),
    first('Type').alias('Type'),
    expr("sum(int(Match)) > 0 as Match")
)

df_new.show(5,0)                                                                                                    
+---+----------+-------------------------------------+-----+
|id |brand     |Type                                 |Match|
+---+----------+-------------------------------------+-----+
|0  |Apple     |[iPhone EE, iPhone 11, iPhone 11 Pro]|true |
|1  |Acer      |[Iconia Talk S, liquid Z6 Plus]      |true |
|3  |Alcatel   |[]                                   |false|
|2  |Casio     |[Casio G'zOne Brigade]               |false|
|4  |HTC, Honor|[Play 4, Play 7]                     |true |
+---+----------+-------------------------------------+-----+

【讨论】:

我必须道歉。 Brand 也是数组 类型。你能帮我改一下 Method-1 吗? 已添加更新以反映您的最新更改,包括 csv 列表。 我收到df_new = df.join(broadcast(df1), expr("array_contains(brand, brand_name)"), "left") 的错误cannot resolve 'array_contains (....) due to data type mismatch: Input to function array_contains should have been array followed by a value with same element type, but it's [array&lt;string&gt;, array&lt;string&gt;] 看起来它们都是同一类型array&lt;string&gt;。我还检查了printSchema(),它们都是array&lt;string&gt; 类型。 你是如何设置 df1 的?如果您按照我帖子中的步骤进行操作,我们会从 csv 文件中读取 df_list,在 groupby 之后,brand_name 应该是 StringType?除非您有其他一些后处理步骤? 更改顺序修复了它...duh expr("array_contains(brand, brand_name)"),我使用了expr("array_contains(brand_name, brand)")【参考方案2】:

这可能有用。

>>> import pyspark.sql.functions as F

>>> df = spark.createDataFrame([
...   ["Apple",['iPhone EE','iPhone 11', 'iPhone 11 Pro']],   
...   ["Acer",['Iconia Talk S','liquid Z6 Plus']],   
...   ["Casio",['Casio G\'zOne Brigade']],
...   ["Alcatel",[]]
... ]).toDF("brand","type")
>>> df.show(df.count(), False)
+-------+-------------------------------------+
|brand  |type                                 |
+-------+-------------------------------------+
|Apple  |[iPhone EE, iPhone 11, iPhone 11 Pro]|
|Acer   |[Iconia Talk S, liquid Z6 Plus]      |
|Casio  |[Casio G'zOne Brigade]               |
|Alcatel|[]                                   |
+-------+-------------------------------------+

>>> file_df = sqlcontext.read.csv('/home/chai/brand.csv', header='true')
>>> file_df.show(file_df.count(), False)
+-------+-------------------+
|brand  |types              |
+-------+-------------------+
|Apple  |iPhone EE          |
|Apple  |iPhone 11 Pro      |
|Apple  |iPhone XS          |
|Acer   |liquid Z6 Plus     |
|Acer   |Acer Predator 8    |
|Casio  |Casio G'zOne Ravine|
|Alcatel|3L                 |
+-------+-------------------+

>>> file_df = file_df.groupBy('brand').agg(F.collect_list('types').alias('new'))
>>> file_df.show(file_df.count(), False)
+-------+-------------------------------------+
|brand  |new                                  |
+-------+-------------------------------------+
|Casio  |[Casio G'zOne Ravine]                |
|Alcatel|[3L]                                 |
|Acer   |[liquid Z6 Plus, Acer Predator 8]    |
|Apple  |[iPhone EE, iPhone 11 Pro, iPhone XS]|
+-------+-------------------------------------+

>>> def test(row_dict):
...     new_dict = dict()
...     for i in row_dict.get('type'):
...             if i in row_dict.get('new'):
...                     new_dict['flag'] = 'True'
...             else:
...                     new_dict['flag'] = 'False'
...     if len(row_dict.get('type')) == 0 and len(row_dict.get('new')) > 0:
...             new_dict['flag'] = 'False'
...     new_dict['brand'] = row_dict.get('brand')
...     new_dict['type'] = row_dict.get('type')
...     new_dict['new'] = row_dict.get('new')
...     return new_dict
... 
>>> def row_to_dict(row):
...     return row.asDict(recursive=True)
>>> rdd = all.rdd.map(row_to_dict)
>>> rdd1 = rdd.map(test)
>>> final_df = sqlcontext.createDataFrame(rdd1)
>>> final_df.show(final_df.count(), False)
+-------+-----+-------------------------------------+-------------------------------------+
|brand  |flag |new                                  |type                                 |
+-------+-----+-------------------------------------+-------------------------------------+
|Apple  |True |[iPhone EE, iPhone 11 Pro, iPhone XS]|[iPhone EE, iPhone 11, iPhone 11 Pro]|
|Acer   |True |[liquid Z6 Plus, Acer Predator 8]    |[Iconia Talk S, liquid Z6 Plus]      |
|Casio  |False|[Casio G'zOne Ravine]                |[Casio G'zOne Brigade]               |
|Alcatel|False|[3L]                                 |[]                                   |
+-------+-----+-------------------------------------+-------------------------------------+

【讨论】:

以上是关于如何将 DataFrame(包括数组)中的两列与 CSV(Dataframe/Dictionary)中的两列匹配的主要内容,如果未能解决你的问题,请参考以下文章

Python:从 DataFrame 中的两列创建结构化 numpy 结构化数组

如何比较pyspark中两个不同数据帧中的两列

熊猫将两列与空值结合起来

pandas使用dataframe中的两列时间对象数据列作差生成时间差数据列将时间差(timedelta对象)与特定时间长度进行比较

如何将一个 DataFrame 中的多个列与另一个 DataFrame 连接

linux系统 如何实现将文本文件的两列交换顺序 分隔符是tab