如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?
Posted
技术标签:
【中文标题】如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?【英文标题】:How can extract features sensitive to length in PySpark without using .toPandas() hack? 【发布时间】:2021-11-10 16:13:30 【问题描述】:我是 PySpark 的新手,我想将 Python 的 Feature Extraction (FE) 部分脚本翻译成 PySpark。起初,我有 Spark 数据框,即所谓的sdf
,包括 2 列 A 和 B:
下面是例子:
data | A | B |
---|---|---|
https://example1.org/path/to/file?param=42#fragment | path/to/file | param=42#fragment |
https://example2.org/path/to/file | path/to/file | NaN |
现在我想应用一些特征工程并提取特征并将结果与列B
中的sdf
连接起来。到目前为止,我可以使用 pythonic 脚本来做到这一点:
#================================> Type <==========================================
def getType(input_value):
if pd.isna(input_value):
return "-"
type_ = "-"
if input_value.isdigit(): # Only numeric
type_ = "Int"
elif bool(re.match(r"^[a-zA-Z0-9_]+$", input_value)): # Consists of one or more of a-zA-Z, 0-9, underscore , and Chinese
type_ = "String"
elif bool(re.match(r"^[\d+,\s]+$", input_value)): # Only comma exists as separator "^[\d+,\s]+$"
type_ = "Array"
else:
existing_separators = re.findall(r"([\+\;\,\:\=\|\\/\#\'\"\t\r\n\s])+", input_value)
# There are one or more separators
# when there is only one separator it is not comma (!= "^[\d+,\s]+$")
if len(existing_separators) > 1 or (len(existing_separators) == 1 and existing_separators[0] != ","):
type_ = "Sentence"
return type_
#================================> Length <==========================================
#Number of charactesrs in parameter value
getLength = lambda input_text: 0 if pd.isna(input_text) else len(input_text)
#================================> Token number <==========================================
double_separators_regex = re.compile(r"[\<\[\(\]+[0-9a-zA-Z_\.\-]+[\\)\]\>]+")
single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")
token_number = lambda input_text: 0 if pd.isna(input_text) else len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])
#quick test
param_example = "url=http://news.csuyst.edu.cn/sem/resource/code/rss/rssfeed.jsp?type=list"
out = double_separators_regex.findall(param_example) + [element for pair in single_separators_regex.findall(param_example) for element in pair if element != ""]
print(out) #['url','http','news.csuyst.edu.cn','sem','resource','code','rss','rssfeed.jsp','type','list']
print(len(out)) #9
#===================================> Encoding type <============================================
import base64
def isBase64(input_value):
try:
return base64.b64encode(base64.b64decode(input_value)) == input_value
except Exception as e:
return False
#================================> Character feature <==========================================
N = 2
n_grams = lambda input_text: 0 if pd.isna(input_text) else len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))
#quick test
n_grams_example = 'zhang1997' #output = [‘zh’, ‘ha’, ‘an’, ‘ng’, ‘g1’, ‘19’, ‘99’ , ‘97’]
n_grams(n_grams_example) # 8
#frame the features
features_df = pd.DataFrame()
features_df["Type"] = df.fragment.apply(getType)
features_df["Length"] = df.fragment.apply(getLength)
features_df["Token_number"] = df.fragment.apply(token_number)
features_df["Encoding_type"] = df.fragment.apply(isBase64)
features_df["Character_feature"] = df.fragment.apply(n_grams)
features_df.columns #Index(['Type', 'Length', 'Token number', 'Encoding type', 'Character feature'], dtype='object')
features_df
问题:什么是翻译 FE 的最佳方法将 Spark 数据帧转换为 Pandas 数据帧toPandas()
以优化管道并以 100% 的火花形式处理它?
所以我很乐意提供一个colab notebook 以便快速调试和评论。
预期的输出以 Spark 数据框的形式显示如下:
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|data |A |B |Type |Length|Token_number |Encoding_type |Character_feature|
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|https://example1....|path/to/file|param=42#fragment|Sentence|17.0 |3.0 |False |15.0 |
|https://example2....|path/to/file|Null |- |0.0 |0.0 |False |0.0 |
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
【问题讨论】:
【参考方案1】:我在这里为您制作了一个示例代码,它并不完美,但它至少遵循了您的源代码,并且应该为您指明下一步的方向。我也在每个 Spark 转换上放了一些 cmets。希望对你有用
from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql import types as T
from pyspark.sql import Window as W
def count_token(input_text):
import re
if input_text is None:
return 0
double_separators_regex = re.compile(r"[\<\[\(\]+[0-9a-zA-Z_\.\-]+[\\)\]\>]+")
single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")
return len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])
def n_grams(input_text):
if input_text is None:
return 0
N = 2
return len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))
(df
.withColumn('test', F.base64(F.unbase64('fragment')))
.withColumn('Type', F
.when(F.isnull('fragment'), '-')
.when(~F.isnull(F.col('fragment').cast('int')), 'Int')
.when(F.regexp_extract('fragment', '^[a-zA-Z0-9_]+$', 0) == F.col('fragment'), 'String')
.when(F.regexp_extract('fragment', '^[\d+,\s]+$', 0) == F.col('fragment'), 'Array') # not sure about this regex?
.otherwise('Sentence') # not sure about this condition either, but you can utilize
# `regexp_extract` like above and do any kind of comparision
)
.withColumn('Length', F
.when(F.isnull('fragment'), 0)
.otherwise(F.length('fragment'))
)
.withColumn('Token_number', F.udf(count_token, T.IntegerType())('fragment')) # Spark doesn't provide `findall` alternative, so
# so we have to use UDF here, you can find document here
# http://spark.apache.org/docs/3.0.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf
.withColumn('Encoding_type', F
.when(F.isnull('fragment'), False)
.otherwise(F.base64(F.unbase64(F.col('fragment'))) == F.col('fragment')) # FYI, this is not always correct,
# for example `assert(isBase64('param123') == False)`
)
.withColumn('Character_feature', F.udf(n_grams, T.IntegerType())('fragment')) # or you can use more advanced feature from SparkML
# https://spark.apache.org/docs/latest/ml-features.html#n-gram
.show()
)
# Output
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
# | data| path| fragment|test| Type|Length|Token_number|Encoding_type|Character_feature|
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
# |https://example1....|path/to/file|param=42#fragment|para|Sentence| 17| 3| false| 15|
# |https://example2....|path/to/file| null|null| -| 0| 0| false| 0|
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
【讨论】:
感谢您的意见。我刚刚在提供的 colab 笔记本中分配了您的解决方案,并获得了与 pythonic 版本不同的结果。 e. G。Encoding_type
应该是 `False` 并且您的解决方案给出 `True`!可以查看提供的notebook 进行快速调试吗?
我看了你的笔记本,你的示例片段是param=42#fragment
,而在笔记本中它是fragment
。
感谢调试。这是由于来自 from urllib.parse import urlsplit
的不完整解析而发生的。以上是关于如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?的主要内容,如果未能解决你的问题,请参考以下文章