如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?

Posted

技术标签:

【中文标题】如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?【英文标题】:How can extract features sensitive to length in PySpark without using .toPandas() hack? 【发布时间】:2021-11-10 16:13:30 【问题描述】:

我是 PySpark 的新手,我想将 Python 的 Feature Extraction (FE) 部分脚本翻译成 PySpark。起初,我有 Spark 数据框,即所谓的sdf,包括 2 列 A 和 B:

下面是例子:

data A B
https://example1.org/path/to/file?param=42#fragment path/to/file param=42#fragment
https://example2.org/path/to/file path/to/file NaN

现在我想应用一些特征工程并提取特征并将结果与​​列B 中的sdf 连接起来。到目前为止,我可以使用 pythonic 脚本来做到这一点:

#================================> Type <==========================================
def getType(input_value):
  if pd.isna(input_value):
    return "-"
    
  type_ = "-"

  if input_value.isdigit():                                                 # Only numeric
    type_ = "Int"
  elif bool(re.match(r"^[a-zA-Z0-9_]+$", input_value)):                     # Consists of one or more of a-zA-Z, 0-9, underscore , and Chinese
    type_ = "String"
  elif bool(re.match(r"^[\d+,\s]+$", input_value)):                         # Only comma exists as separator "^[\d+,\s]+$"
    type_ = "Array"

  else:  
    existing_separators = re.findall(r"([\+\;\,\:\=\|\\/\#\'\"\t\r\n\s])+", input_value)
    # There are one or more separators
    # when there is only one separator it is not comma (!= "^[\d+,\s]+$")
    if len(existing_separators) > 1 or (len(existing_separators) == 1 and existing_separators[0] != ","):
      type_ = "Sentence"                                                

  return type_


#================================> Length <==========================================
#Number of charactesrs in parameter value
getLength = lambda input_text: 0 if pd.isna(input_text) else len(input_text)

#================================> Token number <==========================================

double_separators_regex = re.compile(r"[\<\[\(\]+[0-9a-zA-Z_\.\-]+[\\)\]\>]+")
single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")

token_number = lambda input_text: 0 if pd.isna(input_text) else len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])

#quick test 
param_example = "url=http://news.csuyst.edu.cn/sem/resource/code/rss/rssfeed.jsp?type=list"
out = double_separators_regex.findall(param_example) + [element for pair in single_separators_regex.findall(param_example) for element in pair if element != ""] 

print(out)        #['url','http','news.csuyst.edu.cn','sem','resource','code','rss','rssfeed.jsp','type','list']
print(len(out))   #9

#===================================> Encoding type <============================================

import base64

def isBase64(input_value):
  try:
    return base64.b64encode(base64.b64decode(input_value)) == input_value
  except Exception as e:
    return False

#================================> Character feature <==========================================
N = 2

n_grams = lambda input_text: 0 if pd.isna(input_text) else len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))


#quick test 
n_grams_example = 'zhang1997'  #output = [‘zh’, ‘ha’, ‘an’, ‘ng’, ‘g1’, ‘19’, ‘99’ , ‘97’]
n_grams(n_grams_example)       # 8



#frame the features
features_df = pd.DataFrame()

features_df["Type"] = df.fragment.apply(getType)
features_df["Length"] = df.fragment.apply(getLength)
features_df["Token_number"] = df.fragment.apply(token_number)
features_df["Encoding_type"] = df.fragment.apply(isBase64)
features_df["Character_feature"] = df.fragment.apply(n_grams)

features_df.columns  #Index(['Type', 'Length', 'Token number', 'Encoding type', 'Character feature'], dtype='object')
features_df

问题:什么是翻译 FE 的最佳方法将 Spark 数据帧转换为 Pandas 数据帧toPandas() 以优化管道并以 100% 的火花形式处理它?

所以我很乐意提供一个colab notebook 以便快速调试和评论。

预期的输出以 Spark 数据框的形式显示如下:

+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|data                |A           |B                |Type    |Length|Token_number |Encoding_type |Character_feature|
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|https://example1....|path/to/file|param=42#fragment|Sentence|17.0  |3.0          |False         |15.0             |
|https://example2....|path/to/file|Null             |-       |0.0   |0.0          |False         |0.0              |
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+

【问题讨论】:

【参考方案1】:

我在这里为您制作了一个示例代码,它并不完美,但它至少遵循了您的源代码,并且应该为您指明下一步的方向。我也在每个 Spark 转换上放了一些 cmets。希望对你有用

from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql import types as T
from pyspark.sql import Window as W

def count_token(input_text):
    import re
    if input_text is None:
        return 0
    double_separators_regex = re.compile(r"[\<\[\(\]+[0-9a-zA-Z_\.\-]+[\\)\]\>]+")
    single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")
    return len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])

def n_grams(input_text):
    if input_text is None:
        return 0
    N = 2
    return len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))


(df
    .withColumn('test', F.base64(F.unbase64('fragment')))
    .withColumn('Type', F
        .when(F.isnull('fragment'), '-')
        .when(~F.isnull(F.col('fragment').cast('int')), 'Int')
        .when(F.regexp_extract('fragment', '^[a-zA-Z0-9_]+$', 0) == F.col('fragment'), 'String')
        .when(F.regexp_extract('fragment', '^[\d+,\s]+$', 0) == F.col('fragment'), 'Array') # not sure about this regex?
        .otherwise('Sentence') # not sure about this condition either, but you can utilize
                               # `regexp_extract` like above and do any kind of comparision
    )
    .withColumn('Length', F
        .when(F.isnull('fragment'), 0)
        .otherwise(F.length('fragment'))
    )
    .withColumn('Token_number', F.udf(count_token, T.IntegerType())('fragment')) # Spark doesn't provide `findall` alternative, so
                                                                                 # so we have to use UDF here, you can find document here
                                                                                 # http://spark.apache.org/docs/3.0.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf
    .withColumn('Encoding_type', F
        .when(F.isnull('fragment'), False)
        .otherwise(F.base64(F.unbase64(F.col('fragment'))) == F.col('fragment')) # FYI, this is not always correct,
                                                                                 # for example `assert(isBase64('param123') == False)`
    )
    .withColumn('Character_feature', F.udf(n_grams, T.IntegerType())('fragment')) # or you can use more advanced feature from SparkML
                                                                                  # https://spark.apache.org/docs/latest/ml-features.html#n-gram
    .show()
)

# Output
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
# |                data|        path|         fragment|test|    Type|Length|Token_number|Encoding_type|Character_feature|
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
# |https://example1....|path/to/file|param=42#fragment|para|Sentence|    17|           3|        false|               15|
# |https://example2....|path/to/file|             null|null|       -|     0|           0|        false|                0|
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+

【讨论】:

感谢您的意见。我刚刚在提供的 colab 笔记本中分配了您的解决方案,并获得了与 pythonic 版本不同的结果。 e. G。 Encoding_type 应该是 `False` 并且您的解决方案给出 `True`!可以查看提供的notebook 进行快速调试吗? 我看了你的笔记本,你的示例片段是param=42#fragment,而在笔记本中它是fragment 感谢调试。这是由于来自 from urllib.parse import urlsplit 的不完整解析而发生的。

以上是关于如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?的主要内容,如果未能解决你的问题,请参考以下文章

重构pyspark数据框

PySpark 和 MLLib:随机森林特征的重要性

在pyspark中编写自定义NER和POS标记器,以在管道方法中用于文本输入的特征提取

特征提取算法——LoG特征提取算法

文本特征提取

尺度不变特征变换(SIFT)特征提取分析