3.2spark集群运行应用之第三方jar的处理方式
Posted 专治spark
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3.2spark集群运行应用之第三方jar的处理方式相关的知识,希望对你有一定的参考价值。
在编写程序时,不可避免会用到第三方jar,有三种使用方式:
1、将运行程序需要的所有第三方 jar,分发到所有spark的/soft/spark/jars下
2、将第三方jar打散,和自己的源码打成一个jar包,如3.1中
3、在spark-submit命令中,通过--jars指定使用的第三方jar包
在s102上提交,fastjson-1.2.47.jar 本地,myspark.jar本地,temptags.txt HDFS上
spark-submit --class a --jars fastjson-1.2.47.jar --master spark://s101:7077 myspark.jar temptags.txt
spark-shell脚本也用到spark-submit,因此也可以通过spark-shell指定第三方 jar
spark-shell --master spark://s101:7077 --jars fastjson-1.2.47.jar //该jar在本地
import java.util._ import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import com.alibaba.fastjson._ def pp(line: String)={ //解析方法 val list = new ArrayList[String] val jsonObject = JSON.parseObject(line) val extInfoList = jsonObject.getJSONArray("extInfoList") if (extInfoList != null && extInfoList.size != 0) { for (o <- extInfoList) { val jo = o.asInstanceOf[JSONObject] if (jo.get("title") == "contentTags") { val values = jo.getJSONArray("values") for (value <- values) { list.add(value.toString) } } } } list } val rdd1 = sc.textFile("myspark/temptags.txt") val rdd2 = rdd1.map(s => {val sp = s.split(" ");val lst = pp(sp(1));(sp(0), lst)}).filter(_._2.size() > 0) val rdd3 = rdd2.flatMapValues(_.asScala).map(t=>((t._1,t._2),1)).reduceByKey((a,b)=>a+b).groupBy(_._1._1).mapValues(_.map(t=>(t._1._2,t._2))) val rdd4 = rdd3.mapValues(_.toList.sortBy(-_._2)).sortBy(-_._2(0)._2) val rdd5 = rdd4.collect() rdd5.foreach(println)
以上是关于3.2spark集群运行应用之第三方jar的处理方式的主要内容,如果未能解决你的问题,请参考以下文章
saveAsTextFile 在 spark java.io.IOException 中挂起:数据框中的对等方重置连接