PySpark UDF 在与列分开的情况下返回状态代码和响应
Posted
技术标签:
【中文标题】PySpark UDF 在与列分开的情况下返回状态代码和响应【英文标题】:PySpark UDF Returns Status Code and Response in Separate withColumn 【发布时间】:2021-10-27 17:09:14 【问题描述】:我有一个可以引用 status_code 并返回正文的 udf。
def Api(a):
path = endpoint
headers = 'sample-Key': sample
body = ['text': body ]
res = None
try:
req = requests.post(path, params=params, headers=headers, json=body)
req = req.json()
dumps=json.dumps(req)
except Exception as e:
return e
if res != None and req.status_code == 200:
return json.loads(dumps)
return None
udf_Api = udf(Api)
newDF=df.withColumn("output", udf_Api(col("input")))
我可以返回 json.loads 并将其放入数据框中。但是,我的问题是我还需要将 status_code 保留在单独的列中。所以输出看起来像:
+---------+-----------+----------+
| input|status_code| output|
+---------+-----------+----------+
|inputText| 200|outputText|
+---------+-----------+----------+
那么我怎样才能同时返回 req.status_code 和 json.loads(),但将它们放在数据框中的单独列中?我想过返回一个数组然后拆分它,但不知道该怎么做。
【问题讨论】:
【参考方案1】:您可以修改 UDF 以返回 dict 而不是字符串或整数,然后定义输出模式。
from pyspark.sql import functions as F
from pyspark.sql import types as T
def Api(a):
return
'status': 200,
'data': '"a": 1'
schema = T.StructType([
T.StructField('status', T.IntegerType()),
T.StructField('data', T.StringType())
])
(df
.withColumn('output', F.udf(Api, schema)('col'))
.select('col', 'output.*')
.show()
)
# +---+------+--------+
# |col|status| data|
# +---+------+--------+
# | 10| 200|"a": 1|
# | 20| 200|"a": 1|
# | 30| 200|"a": 1|
# +---+------+--------+
【讨论】:
以上是关于PySpark UDF 在与列分开的情况下返回状态代码和响应的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark udf 在接受多列作为输入的条件定义上返回一列