在 Spark 中使用 map() 和 filter() 而不是 spark.sql

Posted

技术标签:

【中文标题】在 Spark 中使用 map() 和 filter() 而不是 spark.sql【英文标题】:Using map() and filter() in Spark instead of spark.sql 【发布时间】:2020-01-03 14:29:19 【问题描述】:

我有两个数据集,我想通过 INNER JOIN 为我提供一个包含所需数据的全新表。我使用 SQL 并设法得到它。但是现在我想用map()和filter()试试,可以吗?

这是我使用 SPARK SQL 的代码:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession

    object hello 
      def main(args: Array[String]): Unit = 

        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("quest9")

        val sc = new SparkContext(conf)
        val spark = SparkSession.builder().appName("quest9").master("local").getOrCreate()

        val zip_codes = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/zip.csv")
        val census = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/census.csv")

        census.createOrReplaceTempView("census")
        zip_codes.createOrReplaceTempView("zip")

        //val query = spark.sql("SELECT * FROM census")

        val query = spark.sql("SELECT DISTINCT census.Total_Males AS male, census.Total_Females AS female FROM census INNER JOIN zip ON census.Zip_Code=zip.Zip_Code WHERE zip.City = 'Inglewood' AND zip.County = 'Los Angeles'")

        query.show()

        query.write.parquet("/home/hdfs/Documents/population/census/IDE/census.parquet")

        sc.stop()
      
    

【问题讨论】:

使用dataframe.join() 似乎比使用mapfilter 更明智。你为什么不使用它?见***.com/questions/40343625/…或jaceklaskowski.gitbooks.io/mastering-spark-sql/…或***.com/questions/36800174/… @GPI 因为我被指示这样做,出于某种原因 【参考方案1】:

一般来说,唯一明智的方法是使用 `Dataset̀ 的 join() 方法。我会敦促您质疑是否需要仅使用映射/过滤器来执行此操作,因为这不直观,并且可能会使任何有经验的 spark 开发人员感到困惑(或者简单地说,让他翻白眼)。如果数据集增长,它还可能导致可扩展性问题。

也就是说,在您的用例中,避免使用连接非常简单。另一种可能性是发出两个单独的工作来激发:

    获取您感兴趣的邮政编码 过滤该(那些)邮政编码的人口普查数据

第 1 步收集感兴趣的邮政编码(不确定确切的语法,因为我手头没有 spark shell,但找到正确的应该很容易)。

var codes: Seq[String] = zip_codes
             // filter on the city
             .filter(row => row.getAs[String]("City").equals("Inglewood"))
             // filter on the county
             .filter(row => row.getAs[String]("County").equals("Los Angeles"))
             // map to zip code as a String
             .map(row => row.getAs[String]("Zip_Code"))
             .as[String]
             // Collect on the driver side
             .collect()

再一次,以这种方式编写它而不是使用 select/where 对于任何习惯于 spark 的人来说都是很奇怪的。

然而,这会起作用的原因是因为我们可以确定与给定城镇和县匹配的邮政编码会非常小。因此,对结果进行驱动端收集是安全的。

现在进入第 2 步:

census.filter(row => codes.contains(row.getAs[String]("Zip_Code")))
      .map( /* whatever to get your data out */ )

【讨论】:

感谢您的帮助,因为我正在“编队”中,我敢打赌这只是为了增加我的知识。不管怎样,你帮了我很多。 只是没有完全得到第二步中的最后一个 .map() 。我做了一个 .collect() ,它得到了我想要的以及我不想要的列【参考方案2】:

您需要的是join,您的查询大致翻译为:

census.as("census")
  .join(
    broadcast(zip_codes
        .where($"City"==="Inglewood")
        .where($"County"==="Los Angeles")
      .as("zip"))
    ,Seq("Zip_Code"),
    "inner" // "leftsemi" would also be sufficient
  )
  .select(
    $"census.Total_Males".as("male"),
    $"census.Total_Females".as("female")
  ).distinct()

【讨论】:

感谢帮助!

以上是关于在 Spark 中使用 map() 和 filter() 而不是 spark.sql的主要内容,如果未能解决你的问题,请参考以下文章

Spark 算子之map使用

在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错

在 Spark Scala 中使用 map() 重新排序键值对

在 Spark (v.1.5.2) 中从 SQL 查询创建表

spark中map和mapPartitions算子的区别

在 map 函数中调用 SPARK SQL