spark 中 filter(_.length==6)什么意思
Posted wyx100
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 中 filter(_.length==6)什么意思相关的知识,希望对你有一定的参考价值。
filter(_.length==6) 每行的字段数,例如,下行每个字段之间是“\\t”分割的,分割后有6个字段每个字段的值分别是:20111230000005、57375476989eea12893c0c3811607bcf、奇艺高清
、1、1、http://www.qiyi.com/
20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
以上是用“\\t”分割的,用ultraEdit工具可以查看到
sc.textFile(args(0)).map(_.split("\\t")).filter(_.length==6).map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile(args(1))
sc.textFile(args(0)) // 读文件
map(_.split("\\t")) // 每行用"\\t" 分字段
filter(_.length==6) // 有6个字段的行
map(x=>(x(1),1)) // 取每行的第2个字段(从1开始编号)的值,该值计数加1
字符串,索引顺序0,1,2...(从0开始编号,x(0)表示第1个字段,x(1)表示第2个字段)
本行中
x(0)值为:20111230000005
x(1)值为:57375476989eea12893c0c3811607bcf
reduceByKey(_+_) // key相同的字段合并 合并后 字段值在前,计数值在后
map(x=>(x._2,x._1)) // 字段、计数 位置互换,计数在前,字段在后
sortByKey(false) // 降序排列,在本程序中,根据计数,从多到少排序
map(x=>(x._2,x._1)) // 字段、计数 位置互换,计数在后,字段在前
saveAsTextFile(args(1)) // 保存记录到args(1)对应文件中
result:
part-00000 文件为空
part-00001 文件里面值
(57375476989eea12893c0c3811607bcf,1)
package com.spark.firstApp
/**
* Created by root on 16-7-7.
*/
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf, SparkContext
object HelloSpark
def main(args: Array[String])
if (args.length != 2)
System.err.println("Usage: HelloSpark <Input> <Output>")
System.exit(1)
val conf = new SparkConf().setAppName("HelloSpark")
val sc = new SparkContext(conf)
//session查询次数排行榜
//val rdd1 = sc.textFile(args(0)).map(_.split("\\t")).filter(_.length==6)
//val rdd2=rdd1.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
//rdd2.saveAsTextFile(args(1))
//sc.textFile(args(0)).map(_.split(" ")).filter(_.length==6).map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile(args(1))
sc.textFile(args(0)).map(_.split("\\t")).filter(_.length==6).map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile(args(1))
sc.stop()
以上是关于spark 中 filter(_.length==6)什么意思的主要内容,如果未能解决你的问题,请参考以下文章
spark模型error java.lang.IllegalArgumentException: Row length is 0
在 Spark 中使用 map() 和 filter() 而不是 spark.sql
在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class o