如何使用每个列值的窗口规范和连接条件?

Posted

技术标签:

【中文标题】如何使用每个列值的窗口规范和连接条件?【英文标题】:How to use window specification and join condition per column values? 【发布时间】:2017-11-14 18:29:51 【问题描述】:

这是我的 DF1

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction
4295858898|^|204|^|205|^|1|^|I|!|
4295858898|^|204|^|208|^|2|^|I|!|
4295858898|^|204|^|209|^|2|^|I|!|
4295858898|^|204|^|211|^|3|^|I|!|
4295858898|^|204|^|212|^|3|^|I|!|
4295858898|^|204|^|214|^|4|^|I|!|
4295858898|^|204|^|215|^|4|^|I|!|
4295858898|^|206|^|207|^|1|^|I|!|
4295858898|^|206|^|210|^|2|^|I|!|
4295858898|^|206|^|213|^|3|^|I|!|

这是我的 DF2

   DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
  SelfSourcedPublic|^|2002|^|1511224917595|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917596|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917597|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917598|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917599|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917600|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917601|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917602|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917603|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917604|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917605|^|4295858941|^|1|^|2|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917606|^|4295858941|^|1|^|3|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917607|^|4295858941|^|5|^|6|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917608|^|4295858941|^|5|^|7|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917609|^|4295858941|^|12|^|10|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917610|^|4295858941|^|12|^|11|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917611|^|4295858941|^|1|^|13|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917612|^|4295858941|^|12|^|14|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917613|^|4295858941|^|5|^|15|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917614|^|4295858941|^|5|^|16|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917615|^|4295858941|^|1|^|17|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917616|^|4295858941|^|1|^|18|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917617|^|4295858941|^|5|^|19|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917618|^|4295858941|^|5|^|20|^|2|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917619|^|4295858941|^|5|^|21|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917620|^|4295858941|^|1|^|22|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917621|^|4295858941|^|1|^|23|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1511224917622|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1511224917642|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917643|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917644|^|4295858941|^|null|^|37|^|null|^|D|!|

我想根据列的值来实现join。

这就是我试图在 Spark-Scala 中实现的目标,但不知道如何实现它

如果 DF2 中的 FFAction_1 =I 则低于条件

(三列上的join和partitionBy "OrganizationId", "AnnualPeriodId","InterimPeriodId"

val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
  .filter(!$"FFAction".contains("D"))

如果 FFAction_1 =O or D 则低于条件

(两列上的join和partitionBy "OrganizationId","InterimPeriodId"

val windowSpec = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
   .filter(!$"FFAction".contains("D"))

下面是我的完整代码

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark. SparkConf, SparkContext 
    import java.sql.Date, Timestamp
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4))

val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN")
val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name))


//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR")
val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


 //------------------------------- filtering only the latest from increamental ------------------------------
 
    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")


    val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

//-----------------separating the increamental df for insert, deletion and overwrite----------------

    //---------------insert rows are selected -------------------------------
    //insert a row if I is detected and if O is found then first delete and then insert
    
    val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

    //------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
    // delete rows from parent if both D or O is found in increamental
    val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

    //join by two primary keys for deletion and delete from the parent dataframe
    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|")))

val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)


    dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/Interim2Annual/output")

   val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count
  
  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/Interim2Annual/Descr")

【问题讨论】:

您的代码需要大量审查并且有很多不需要的转换。我将不得不逐行逻辑。你必须帮助我理解这一点 【参考方案1】:

免责声明不知何故,这个和我刚刚回答的other question 似乎重复,所以很快就会被标记为这样,否则我们会发现它们之间的区别并且免责声明消失了。时间会证明一切。


鉴于需要根据FFAction_1 列的值选择最终的窗口规范和连接条件,我会先做filter 并决定使用什么窗口聚合和连接。

val df1 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df1.csv").
  select("OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber", "FFAction")
scala> df1.show
+--------------+--------------+---------------+-------------+--------+
|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber|FFAction|
+--------------+--------------+---------------+-------------+--------+
|    4295858898|           204|            205|            1|       I|
|    4295858898|           204|            208|            2|       I|
|    4295858898|           204|            209|            2|       I|
|    4295858898|           204|            211|            3|       I|
|    4295858898|           204|            212|            3|       I|
|    4295858898|           204|            214|            4|       I|
|    4295858898|           204|            215|            4|       I|
|    4295858898|           206|            207|            1|       I|
|    4295858898|           206|            210|            2|       I|
|    4295858898|           206|            213|            3|       I|
+--------------+--------------+---------------+-------------+--------+

连接的右侧在“形状”上非常相似。

val df2 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df2.csv").
  select("DataPartition_1", "PartitionYear_1", "TimeStamp", "OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber_1", "FFAction_1")
scala> df2.show
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|  DataPartition_1|PartitionYear_1|    TimeStamp|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber_1|FFAction_1|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|SelfSourcedPublic|           2002|1510725106270|    4295858941|            24|             25|              4|         O|
|SelfSourcedPublic|           2002|1510725106271|    4295858941|            24|             25|              5|         O|
|SelfSourcedPublic|           2003|1510725106272|    4295858941|            30|             31|              2|         O|
|SelfSourcedPublic|           2003|1510725106273|    4295858941|            30|             31|              3|         O|
|SelfSourcedPublic|           2001|1510725106293|    4295858941|             5|             20|              2|         O|
|SelfSourcedPublic|           2001|1510725106294|    4295858941|             5|             21|              3|         O|
|SelfSourcedPublic|           2002|1510725106295|    4295858941|             1|             22|              4|         O|
|SelfSourcedPublic|           2002|1510725106296|    4295858941|             1|             23|              5|         O|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         I|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         D|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+

使用上述数据集,我会filter 看看df2FFAction_1 列中是否至少有一个I,然后选择正确的窗口规范和连接条件。

诀窍是使用join 运算符,后跟where(或filter)运算符,这样您就可以决定使用什么连接条件。

val noIs = df2.filter($"FFAction_1" === "I").take(1).isEmpty
val (windowSpec, joinCond) = if (noIs) 
  (windowSpecForOs, joinForOs) 
 else 
  (windowSpecForIs, joinForIs)

val latestForEachKey = df2result.withColumn("rank", rank() over windowSpec)
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey).where(joinCond)

【讨论】:

@eliasah 看起来像这样,但我还不确定。让我们等到他们批准一个,因为 OP 反应灵敏。目前我认为它们是不同的,因为一个使用窗口并加入,而另一个只是窗口规格(无加入条件)。从这个意义上说,它们是不同的,并且要求连接条件在 where 内作为 Column 表达式。如果您不介意,请继续关闭。 我想知道输入数据是否由|^|分隔,如果只使用|作为分隔符,创建数据框时会不会有问题,因为会有重复的列名以^ 作为列名? @RameshMaharjan 删除 select 看看你自己 :) 去过那里。 @JacekLaskowski,我收到了Reference '^' is ambiguous, could be: ^#34, ^#36, ^#38, ^#40, ^#42, ^#44, ^#46.; 错误,这就是我问的原因。 @JacekLaskowski 我只是在测试我所有的场景......非常感谢......我会回复

以上是关于如何使用每个列值的窗口规范和连接条件?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 mySQL 返回另一列中每个值的最常见列值?

如何根据 Vue.js 2.6.11 和 Viutify 2.2.11 的条件更改 <v-data-table> 中列值的文本和颜色

Pandas:如何根据其他列值的条件对列进行求和?

如何选择不同列值的最新记录?

如何在不使用 python 循环的情况下检测和转换列值的单位?

如何通过比较火花中相同列的两个数据帧来计算列值的数量