在 spark 中实现 informatica 逻辑

Posted

技术标签:

【中文标题】在 spark 中实现 informatica 逻辑【英文标题】:Implementing informatica logic in spark 【发布时间】:2018-06-24 11:30:11 【问题描述】:

我们如何在 spark 中实现以下逻辑?

如果列值为空,那么它应该返回'' 如果 ltrim(rtrim(column)) 为 null 那么它应该返回 ' ' 否则它应该填充该列的值

【问题讨论】:

我想你的意思是,scala、pyspark 或 java @thebluephantom pyspark 很容易知道...但我是 SCALA 人。如果你谷歌你应该在其他地方找到这样的答案 RDD 还是数据框还是 RDS? 'a ' 返回 'a' 我认为 【参考方案1】:

df=df.withColumn("new_col",when(col("old_col").isNull(),"").otherwise(col("old_col")))

更喜欢 DF 而不是 rdd,因为它涉及到底层优化

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

【讨论】:

【参考方案2】:

我不是 pyspark 的人,但如果你看看下面的 Scala 示例和

    如果我没记错的话,看看 pyspark lambda 方法和缺少 val 并编写自己的 def,就像在 Informatica 中进行可重用转换一样

那么你应该能够继续前进。我提供了几种修剪方法,如果还有的话,在 RDD 中使用地图。

3 行显示一般原则:

val rdd = sc.parallelize(List((" aaa", "x"), ("bbbb ", "y"), (" cc ", "z"), ("gggg  ", " a"), ("    ", "b")))
val rdd2 = rdd.map case (field1, field2) => ( field1.replaceAll(" ", ""), field1.trim, field1, field2) 
val rdd3 = rdd2.map case (field1, field2, field3, field4) => (field1.replaceAll(" ", ""), if (field1.trim == "") " "  else field1 , field3, field4) 

【讨论】:

以上是关于在 spark 中实现 informatica 逻辑的主要内容,如果未能解决你的问题,请参考以下文章

请教如何在informatica中实现数据库到XML文件的转换

如何在 Apache Spark 中实现递归算法?

如何在 Spark 中实现“交叉连接”?

在 Spark GraphX 中实现拓扑排序

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

使用 ForeachWriter 在 Spark 流中实现 Cassandra 接收器