spark 中 filter(_.length==6)什么意思

Posted wyx100

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 中 filter(_.length==6)什么意思相关的知识,希望对你有一定的参考价值。

filter(_.length==6)  每行的字段数,例如,下行每个字段之间是“\\t”分割的,分割后有6个字段每个字段的值分别是:20111230000005、57375476989eea12893c0c3811607bcf、奇艺高清

、1、1、http://www.qiyi.com/

20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/

以上是用“\\t”分割的,用ultraEdit工具可以查看到

sc.textFile(args(0)).map(_.split("\\t")).filter(_.length==6).map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile(args(1))


sc.textFile(args(0)) // 读文件
map(_.split("\\t")) // 每行用"\\t" 分字段
filter(_.length==6) // 有6个字段的行
map(x=>(x(1),1)) // 取每行的第2个字段(从1开始编号)的值,该值计数加1   
字符串,索引顺序0,1,2...(从0开始编号,x(0)表示第1个字段,x(1)表示第2个字段)
本行中
x(0)值为:20111230000005
x(1)值为:57375476989eea12893c0c3811607bcf
reduceByKey(_+_) // key相同的字段合并   合并后 字段值在前,计数值在后
map(x=>(x._2,x._1))  // 字段、计数 位置互换,计数在前,字段在后
sortByKey(false) // 降序排列,在本程序中,根据计数,从多到少排序
map(x=>(x._2,x._1)) // 字段、计数 位置互换,计数在后,字段在前
saveAsTextFile(args(1)) // 保存记录到args(1)对应文件中


result:

part-00000  文件为空

part-00001  文件里面值

(57375476989eea12893c0c3811607bcf,1)

package com.spark.firstApp

/**
* Created by root on 16-7-7.
*/
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf, SparkContext

object HelloSpark
  def main(args: Array[String]) 
    if (args.length !=  2) 
      System.err.println("Usage: HelloSpark  <Input> <Output>")
      System.exit(1)
    

    val conf = new SparkConf().setAppName("HelloSpark")
    val sc = new SparkContext(conf)

    //session查询次数排行榜
    //val rdd1 = sc.textFile(args(0)).map(_.split("\\t")).filter(_.length==6)
    //val rdd2=rdd1.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
    //rdd2.saveAsTextFile(args(1))
    //sc.textFile(args(0)).map(_.split(" ")).filter(_.length==6).map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile(args(1))
    sc.textFile(args(0)).map(_.split("\\t")).filter(_.length==6).map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile(args(1))
    sc.stop()
  




 


以上是关于spark 中 filter(_.length==6)什么意思的主要内容,如果未能解决你的问题,请参考以下文章

spark-shell学习笔记

spark简单例子

spark模型error java.lang.IllegalArgumentException: Row length is 0

在 Spark 中使用 map() 和 filter() 而不是 spark.sql

在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class o