Apache Spark SQL StructType 和 UDF

Posted

技术标签:

【中文标题】Apache Spark SQL StructType 和 UDF【英文标题】:Apache Spark SQL StructType together with UDF 【发布时间】:2021-01-13 14:47:05 【问题描述】:

Spark 1.6 / Java-7

带有新列的初始数据框

# adding new column for the UDF computation:
df = df.withColumn("TEMP_COLUMN", lit(null));

UDF 函数创建新的 StructType 并将其放入单元格的正确格式是什么?

public static DataFrame compute(SQLContext sqlContext, DataFrame df) 
    sqlContext.udf().register("compute", new MyUdf(), new ArrayType(new StructType(), true));
    return df.withColumn("TEMP_COLUMN", functions.callUDF("compute"));


class MyUdf implements UDF0<List<StructType>> 
@Override
public  List<StructType> call() 
    ...
    return ? // what must be returned here? List<StructType> or List<String> or anything else?



+-------------------------+
|TEMP_COLUMN              |
+-------------------------+
|[A[1, 2, 3], B[4, 5, 6]] |
+-------------------------+

我想要一个包含元素数组的结构,每个元素都有几个字段。 我不明白,注册类型new ArrayType(new StructType(), true)是否正确且UDF函数List&lt;StructType&gt;的返回类型是否相同。 数据应该如何返回?是不是像new StructType(new StructField[]new StructField(...))

【问题讨论】:

【参考方案1】:

回答我自己的问题,因为我们很幸运地找到了答案:

假设我们有一个“复杂”的结构来满足我们的需求:

MapType CLIENTS_INFO_DATA_TYPE = DataTypes.createMapType(
  DataTypes.StringType,
  DataTypes.createStructType(
    new StructField[] 
        DataTypes.createStructField("NAME_1", DataTypes.DoubleType, false),
        DataTypes.createStructField("NAME_2", DataTypes.DoubleType, false),
        DataTypes.createStructField("NAME_3", DataTypes.DoubleType, false)
  ),
  true
);


StructType COMPLEX_DATA_TYPE = DataTypes.createStructType(new StructField[] 
  DataTypes.createStructField("CLIENTS_INFO", CLIENTS_INFO_DATA_TYPE, true),
  DataTypes.createStructField("COMMENT", DataTypes.StringType, true)

它的架构:

dataFrame.printSchema()

|-- COMPLEX_DATA_TYPE: struct (nullable = true)
|    |-- CLIENTS_INFO: map (nullable = true)
|    |    |-- key: string
|    |    |-- value: struct (valueContainsNull = true)
|    |    |    |-- NAME_1: double (nullable = false)
|    |    |    |-- NAME_2: double (nullable = false)
|    |    |    |-- NAME_3: double (nullable = false)
|    |-- COMMENT: string (nullable = true)

接下来我们必须注册使用我们的结构操作的 UDF 函数:

DataFrame compute(SQLContext sqlContext, DataFrame df) 
sqlContext.udf().register(
        "computeUDF",
        new MyUDF(),
        COMPLEX_DATA_TYPE);

  return df.withColumn("TEMP_FIELD_NAME", functions.callUDF("computeUDF", field_1.getColumn(), field_2.getColumn()));

最后一步是UDF函数本身,它返回一个Row对象(将被转换成我们的结构):

public final class MyUDF implements UDF2<Double, Double, Row> 
@Override
public Row call(Double value1, Double value2) 
    Map<String, Row> clientsInfoMap = new HashMap<>();
    ...
    for (Map.Entry<String, ClientInfo> clientInfoEntry : clientsInfo.entrySet()) 
        final String client = clientInfoEntry.getKey();
        final ClientInfo clientInfo = clientInfoEntry.getValue();

        final Double[] clientInfoValues = 10.0, 20.0, 30.0;
        
        Row clientInfoRow = new GenericRow(clientInfoValues);
        clientsInfoMap.put(client, clientInfoRow);
    

    Object[] fullClientsInfo = new Object[] clientsInfoMap, "string-as-a-comment";
    return new GenericRow(fullClientsInfo);
  

现在,由于它是一个结构,我们可以使用TEMP_FIELD_NAME.CLIENTS_INFO 和其他任何名称进行选择。

【讨论】:

以上是关于Apache Spark SQL StructType 和 UDF的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - org.apache.spark.sql.AnalysisException

类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD

为啥 org.apache.spark.sql.types.DecimalType 在 Spark SQL 中的最大精度值为 38?

Apache Spark :org.apache.spark.sql.Dataset.drop(String... colNames) 方法用于 Java

如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?

从 org.apache.spark.sql.Row 中提取信息