如何提高我的 spark-sql-joins 的性能

Posted

技术标签:

【中文标题】如何提高我的 spark-sql-joins 的性能【英文标题】:How to improve performance of my spark-sql-joins 【发布时间】:2019-01-03 19:17:24 【问题描述】:

我有两个数据源(都是 csv 文件),一个是传入数据源(220 万条记录)和主数据源(3500 万条记录)。我的工作是验证传入数据源中有多少记录与主数据源匹配并输出它们。这里的关键是记录有噪声,需要模糊字符串匹配而不是精确匹配。我的连接在小数据上效果很好,但当我必须对大数据做同样的事情时,它需要很长时间。

仅供参考 .. 使用此代码,我花了大约 1 小时 40 分钟在 8 核机器上对传入数据(100 万条记录)与主数据(300 万条记录)执行连接。

例如。 主数据源具有如下所示的 3500 万条记录之一

“Markets, Inc.”,1 Bank Plz,,Ch​​icago,IL,60670-0001,IL

传入的数据有记录之一

“Markets Inc”,1 Bank Pl,,Chicago,IL,60670-0001,IL

下面是我的代码

def myFunc: (String => String) = 
      s =>
        if (s.length > 5) 
          s.substring(0, 5)
         else s
    
val myUDF = udf(myFunc)
var sourcedata = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
  .csv("./src/main/resources/company_address_sample3000000.txt").na.fill("")
  .select(col("COMPANY_NAME").alias("NAME1"), concat(col("STREET_ADDR_1"),
    col("STREET_ADDR_2")).alias("ADDRESS1"), col("CITY").alias("CITY1"), col("STATE").alias("STATE1"),
    myUDF(col("ZIP")).alias("ZIP1"))
  .withColumn("Beginswith1", col("NAME1").substr(0, 1)).distinct()
  .repartition(col("Beginswith1"), col("NAME1"), col("ADDRESS1"), col("CITY1"), col("STATE1"), col("ZIP1"))
var incomingData = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
  .csv("./src/main/resources/common_format_sample1000000.txt")
  .select("NAME", "ADDRESS", "CITY", "STATE", "ZIP")
  .withColumn("Beginswith", col("NAME").substr(0, 1)).distinct()
  .repartition(col("Beginswith"), col("NAME"), col("ADDRESS"), col("CITY"), col("STATE"), col("ZIP"))

def calculate_similarity(str: String, str1: String): Double = 
  val dist = new JaroWinkler()
  Try 
    dist.similarity(str, str1)
   getOrElse (0.0)


def myFilterFunction(
                      nameInp: String, nameRef: String,
                      addInp: String, addRef: String,
                      cityInp: String, cityRef: String,
                      stateInp: String, stateRef: String,
                      zipInp: String, zipRef: String
                    ) = 
  stateInp == stateRef && cityInp == cityRef && calculate_similarity(nameInp, nameRef) > 0.8 && calculate_similarity(addInp, addRef) > 0.8


val udf1 = org.apache.spark.sql.functions.udf(myFilterFunction _)
val filter: Column = udf1(
  incomingData("NAME"), sourcedata("NAME1"),
  incomingData("ADDRESS"), sourcedata("ADDRESS1"),
  incomingData("CITY"), sourcedata("CITY1"),
  incomingData("STATE"), sourcedata("STATE1"),
  incomingData("ZIP"), sourcedata("ZIP1")
)

incomingData.join(sourcedata, incomingData("Beginswith") === sourcedata("Beginswith1") && filter, "left_semi")
  .write.csv("./src/main/resources/hihello3-0.8-1m3m.csv")

【问题讨论】:

我会首先确保数据库已优化。 (索引、分区等)然后我编写了一个 python 脚本,如果匹配,它会根据数据库检查来自嘈杂数据源的原始值。继续。否则,开始模糊逻辑的噩梦。识别必须 100% 匹配的列,然后从那里向后移动。使用“reviewed”列更新数据库,以了解您已对其进行评估,也许还有时间戳 感谢您的建议,但我更改了上面的问题描述。 【参考方案1】:

重新排列连接过滤器序列,将时间从 1 小时 50 分钟显着减少到 90 秒。 虽然从 sql 优化的角度来看,这不是一个解决方案,但考虑到我的数据,它满足了我当前的目的。 从 sql 优化的角度来看,我仍然很想看看是否有人提出了解决方案。

var sourcedata = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
      .csv("./src/main/resources/company_address.txt").na.fill("")
      .select(col("COMPANY_NAME").alias("NAME1"), concat(col("STREET_ADDR_1"),
        col("STREET_ADDR_2")).alias("ADDRESS1"), col("CITY").alias("CITY1"), col("STATE").alias("STATE1"),
        col("ZIP").alias("ZIP1"))
      .withColumn("Beginswith1", col("NAME1").substr(0, 1))
      .repartition(col("Beginswith1"), col("NAME1"), col("ADDRESS1"), col("CITY1"), col("STATE1"), col("ZIP1"))

var incomingData_Select = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
  .csv("./src/main/resources/common_format.txt")
  .select("NAME", "ADDRESS", "CITY", "STATE", "ZIP")
  .withColumn("Beginswith", col("NAME").substr(0, 1)).distinct()
  .repartition(col("Beginswith"), col("NAME"), col("ADDRESS"), col("CITY"), col("STATE"), col("ZIP"))

def calculate_similarity(str: String, str1: String, str2: String, str3: String): Boolean = 
  val dist = new JaroWinkler()
  Try 
    dist.similarity(str, str1) > 0.8 && dist.similarity(str2, str3) > 0.8
   getOrElse (false)


def myFilterFunction(
                      nameInp: String, nameRef: String,
                      addInp: String, addRef: String
                    ) = 
  calculate_similarity(nameInp, nameRef, addInp, addRef)


val sim_udf = org.apache.spark.sql.functions.udf(myFilterFunction _)

val filter: Column = sim_udf(
  incomingData_Select("NAME"), sourcedata("NAME1"),
  incomingData_Select("ADDRESS"), sourcedata("ADDRESS1")
)

val matching_companies = incomingData_Select
  .join(sourcedata, incomingData_Select("STATE") === sourcedata("STATE1") && incomingData_Select("CITY") === sourcedata("CITY1") && incomingData_Select("Beginswith") === sourcedata("Beginswith1") && filter, "left_semi")

【讨论】:

以上是关于如何提高我的 spark-sql-joins 的性能的主要内容,如果未能解决你的问题,请参考以下文章

Django 形式的性

程序员如何快速技术提升?不以涨工资为目的的技术学习,都会以失败而告终,带着很强目的性的学,让外力打击自己

记一次对 Laravel-permission 项目的性能优化

设计模式

Django连接多个数据库并实现读写分离

Django连接多个数据库并实现读写分离