udf 中的 F.regexp_extract 返回 AttributeError: 'NoneType' object has no attribute '_jvm'
Posted
技术标签:
【中文标题】udf 中的 F.regexp_extract 返回 AttributeError: \'NoneType\' object has no attribute \'_jvm\'【英文标题】:F.regexp_extract in a udf returns AttributeError: 'NoneType' object has no attribute '_jvm'udf 中的 F.regexp_extract 返回 AttributeError: 'NoneType' object has no attribute '_jvm' 【发布时间】:2020-08-26 10:22:14 【问题描述】:我是 spark 和 pyspark 的初学者。我有一个庞大的数据集,并且我有一组需要检查并从列中提取的关键字。
我的代码是这样的
temp_skills = ['sales', 'it', 'c']
@F.udf
def lookhere(z) -> str:
strng = ' '
for skill in temp_skills:
strng += F.regexp_extract(z, skill, 0)
return strng
spark.udf.register("lookhere", lambda z : lookhere(z), returnType=StringType())
DF.withColumn(
'temp',
lookhere(DF.dept_name)
).show(truncate = False)
原始 DF:
+------------------+-------+
| dept_name|dept_id|
+------------------+-------+
| finance sales it| 10|
|marketing it sales| 20|
| sales| 30|
| it| 40|
+------------------+-------+
预期 DF:
+------------------+-------+----------+
| dept_name|dept_id| temp|
+------------------+-------+----------+
| finance sales it| 10|sales it c|
|marketing it sales| 20| sales it |
| sales| 30| sales |
| it| 40| it |
+------------------+-------+----------+
错误:
---------------------------------------------------------------------------
PythonException Traceback (most recent call last)
<ipython-input-80-0c11f7327f77> in <module>()
1 DF.withColumn('temp2',
2 lookintothis(DF.dept_name)
----> 3 ).show(truncate = False)
/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
440 print(self._jdf.showString(n, 20, vertical))
441 else:
--> 442 print(self._jdf.showString(n, int(truncate), vertical))
443
444 def __repr__(self):
/content/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in raise_from(e)
PythonException:
An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
for item in iterator:
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
return lambda *a: f(*a)
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-75-31ef5eea3b75>", line 7, in lookintothis
File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1811, in regexp_extract
jc = sc._jvm.functions.regexp_extract(_to_java_column(str), pattern, idx)
AttributeError: 'NoneType' object has no attribute '_jvm
环境: 谷歌 Colab 视窗 10 火花 3.0.0 pyspark 3.0.0
我的方法错了吗?还是我的语法?请帮助我理解这一点!
【问题讨论】:
regexp_extract
用于 spark 数据框列,但您的 udf 将获得作为列数据类型的字符串,z 不是列本身。
是的!那么你能建议一种我可以让它工作的方法吗?问题是如何提取具体匹配的单词而不是整个模式
【参考方案1】:
我使用in
语句检查了skill
在dept_name
中的位置。我认为您不需要更换任何东西。
temp_skills = ['sales', 'it', 'c']
from pyspark.sql.functions import *
@udf
def lookhere(z) -> str:
strings = []
for skill in temp_skills:
if skill in z: strings.append(skill)
return strings
spark.udf.register("lookhere", lookhere)
df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")
df.withColumn('temp', lookhere('dept_name')).show(4, False)
+------------------+-------+--------------+
|dept_name |dept_id|temp |
+------------------+-------+--------------+
|finance sales it |10 |[sales, it, c]|
|marketing it sales|20 |[sales, it] |
|sales |30 |[sales] |
|it |40 |[it] |
+------------------+-------+--------------+
dataframe 方法的另一种方法,通过拆分dept_name
来更准确地比较关键字。
temp_skills = ['sales', 'it', 'c']
from pyspark.sql.functions import *
df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")
df.withColumn('dept_names', split('dept_name', ' ')) \
.withColumn('skills', array(*map(lambda c: lit(c), temp_skills))) \
.withColumn('temp', array_intersect('dept_names', 'skills')) \
.drop('dept_names', 'skills').show(4, False)
+------------------+-------+-----------+
|dept_name |dept_id|temp |
+------------------+-------+-----------+
|finance sales it |10 |[sales, it]|
|marketing it sales|20 |[it, sales]|
|sales |30 |[sales] |
|it |40 |[it] |
+------------------+-------+-----------+
【讨论】:
这两种方法都有效!谢谢@Lamanus,我猜当关键字有多个像human resources
这样的词时,第一种方法会胜过第二种方法,因为它们会被拆分进入 ['human', 'resources'] 并且不会与 ['human resources'] 匹配。以上是关于udf 中的 F.regexp_extract 返回 AttributeError: 'NoneType' object has no attribute '_jvm'的主要内容,如果未能解决你的问题,请参考以下文章