避免指定模式两次(Spark/scala)

Posted

技术标签:

【中文标题】避免指定模式两次(Spark/scala)【英文标题】:Avoid specifying schema twice (Spark/scala) 【发布时间】:2018-11-08 11:10:45 【问题描述】:

我需要按特定顺序遍历数据框并应用一些复杂的逻辑来计算新列。

此外,我的强烈偏好是以通用方式执行此操作,因此我不必列出一行的所有列并执行df.as[my_record]case Row(...) =>,如here 所示。相反,我想通过名称访问行列并将结果列添加到源行。

以下方法工作得很好,但我想避免两次指定架构:第一次是为了在迭代时按名称访问列,第二次是为了处理输出。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = 
  if (iter.hasNext) 
    def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

    val head = iter.next
    val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
    val r =
      new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

    iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
    )
   else iter


val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show

应用mapPartitions 后丢失了哪些信息,因此在没有显式编码器的情况下无法处理输出?如何避免指定?

【问题讨论】:

你能使用 Dataset 并提供一个从 T => U 映射的函数吗? @TerryDactyl,你能详细说明一下吗?我使用df.repartition($"part").sortWithinPartitions($"id"),即org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],我为mapPartitions提供f_row(iter: Iterator[Row]) : Iterator[Row] A DataFrame 是一个 Dataset[Row],如下所述,Row 是无类型的。如果您要提供与地图函数 Iterator[T] => Iterator[U] 和 import spark.implicits._ 的输入和输出形状相对应的案例类 T 和 U,那么 Spark 可能能够提供一个 Encoder 并且您将有效地从 Dataset[T] => Dataset[U] 映射。这只是一个猜测,但可能值得一试。 见下文...... 【参考方案1】:

应用 mapPartitions 后丢失了哪些信息,因此无法处理输出

信息几乎不会丢失 - 它从一开始就不存在 - RowInternalRow 的子类基本上是无类型的可变形状容器,它们不提供任何有用的类型信息,可用于派生一个Encoder

GenericRowWithSchema 中的schema 无关紧要,因为它根据元数据而不是类型来描述内容。

如何避免指定?

对不起,你运气不好。如果你想在静态类型语言中使用动态类型构造(一袋Any),你必须付出代价,这里提供了Encoder

【讨论】:

我不同意“它从一开始就不存在”。我可以在重新分区和订购df.repartition($"part").sortWithinPartitions($"id").show 后显示结果,但在mapPartitions 之后就不可能了。 mapPartitions中使用的函数是(func: (Iterator[T]) ⇒ Iterator[U])。那么,如果函数在迭代后产生几乎相同的记录(准确地说是新列),为什么不能使用show【参考方案2】:

好的 - 我已经检查了我的一些 spark 代码,并且将 .mapPartitions 与 Dataset API 一起使用不需要我显式构建/传递编码器。

你需要这样的东西:

case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)

import spark.implicits._

// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]

def f_row(it: Iterator[Before]): Iterator[After] = ???

beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show

【讨论】:

正如我在原帖中提到的,“我强烈的偏好是以通用方式执行此操作,因此我不必列出一行的所有列并执行df.as[my_record]case Row(...) =&gt;”。如果将新列添加到数据框中,我想避免任何更改。见***.com/questions/53159461/…【参考方案3】:

我发现下面的解释足够了,也许对其他人有用。

mapPartitions 需要Encoder,否则它无法从迭代器或Rows 构造Dataset。即使每一行都有一个模式,Dataset[U] 的构造函数也不能派生(使用)该 shema。

  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = 
    new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
  

另一方面,无需调用mapPartitions,Spark 可以使用从初始查询派生的架构,因为原始列的结构(元数据)没有改变。

我在这个答案中描述了替代方案:https://***.com/a/53177628/7869491。

【讨论】:

以上是关于避免指定模式两次(Spark/scala)的主要内容,如果未能解决你的问题,请参考以下文章

在“点击,点击”事件触发两次。如何避免?

Jason 解码两次以正确匹配模式

如何避免重复请求按钮单击两次

如何避免两次调用observe()?

如何避免两次编写 SQL Server 查询以避免重复?

避免两次打开同一页面