PySpark UDF,输入端只有 None 值

Posted

技术标签:

【中文标题】PySpark UDF,输入端只有 None 值【英文标题】:PySpark UDF, only None values at the input 【发布时间】:2020-05-05 18:50:27 【问题描述】:

我的 Kafka 流应用程序的 UDF 功能出现问题。每次调用 UDF 函数时,输入上只有 None 值而不是有效的列值。类型错误 然后引发,因为应用程序期望 str,而不是 None。

UDF函数定义:

@udf(returnType=StringType())
def get_asn(ip_addr):
    from fm_kafka2parquet.asn_lookup import AsnLookup

    result = AsnLookup\
        .get_instance(ASN_DB_PATH)\
        .get().lookup(ip_addr)[0]  # first record from tuple is ASN number
    if result is None:
        return "n/a"
    return result

UDF 函数调用:

  # data frame for netflow reading
  df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", CONFIG_KAFKA_BOOTSTRAP) \
      .option("subscribe", CONFIG_KAFKA_TOPIC) \
      .option("startingOffsets", "latest") \
      .load() \
      .selectExpr("CAST(value AS STRING)") \
      .withColumn("net", from_json("value", Structures.get_ipfix_structure())) \
      .select("net.*")

  # remove ipfix prefix in case of ipfixv1 collector
  temp_list = []
  for c in df.columns:
      new_name = c.replace('ipfix.', '')
      temp_list.append(new_name)
  df = df.toDF(*temp_list)

  # enrichment
  edf = df \
      .withColumn("sourceAS", get_asn('sourceIPv4Address')) \
      .withColumn("destinationAS", get_asn('destinationIPv4Address'))

一切都以 err 结尾,这是由 get_asn UDF 函数使用的 pyasn 库引发的:

TypeError: search_best() argument 1 must be str, not None

【问题讨论】:

【参考方案1】:

尝试使用它,如下所述。 .withColumn("sourceAS", get_asn(F.col('sourceIPv4Address'))

【讨论】:

嗨 Prateek,如果您的意思是 import pyspark.sql.functions as F ... .withColumn("sourceAS", get_asn(F.col('sourceIPv4Address ')))。我试过但没有成功,UDF函数输入仍然只有None【参考方案2】:

而且,这看起来很可疑。

# remove ipfix prefix in case of ipfixv1 collector
  temp_list = []
  for c in df.columns:
      new_name = c.replace('ipfix.', '')
      temp_list.append(new_name)
  df = df.toDF(*temp_list)

您正在更改列名,然后选择它们,但新的列名不在数据框中,对吧?因此,它必须返回空数据框。

如果要重命名列,请使用 -

df = df.withColumnRenamed(c, c.replace('ipfix.', ''))

有关如何在 pyspark 中清除列名的详细信息,请参阅此 - https://www.youtube.com/watch?v=vAHPAP9Oagc&t=1s

【讨论】:

我也尝试了 df.withColumnRenamed - 没有成功。现在我怀疑 Cloudera 平台,因为在我的本地环境中一切正常。

以上是关于PySpark UDF,输入端只有 None 值的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中注册我的 udf 有啥好处吗?

Pyspark:访问 UDF 中行内的列

何时使用 UDF 与 PySpark 中的函数? [复制]

Pyspark 使用 udf 处理数组列并返回另一个数组

在 pyspark 的 Scala UDF 中使用默认参数值?

pyspark udf 返回值