spark的累加器-SQL-Streaming
Posted 飞机耳朵
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark的累加器-SQL-Streaming相关的知识,希望对你有一定的参考价值。
RDD持久化 --------------- memory disk off-heap serial replication Memory_ONLY(true , false ,false , true ,1) 广播变量 --------------- driver端切成小块,存放到blockmanager,executor广播变量 的小块,首先从自己的blockmgr中提取,如果提取不到,在从其他 节点(driver + executor)提取,一旦提取到存放在自己的blockmgr。 RDD + dep 附加在task中。 scala的lazy延迟计算机制。 累加器 ---------------- 只能累加,只能在driver读取value,executor不能读取。 不能在map中调用。累计器的update只应该在action中执行, 自定义累加器,实现气温双聚合 ----------------------------- import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer /** * 测试累加器 */ object AccTestScala { //自定义累加器 class MyAcc extends AccumulatorV2[Int,(Int,Int)]{ //最高气温 var max: Int = Int.MinValue //最低气温 var min: Int = Int.MaxValue //判断是否是初始值 override def isZero: Boolean = { max == Int.MinValue && min == Int.MaxValue } override def copy(): AccumulatorV2[Int, (Int, Int)] = { val copy = new MyAcc() copy.max = max copy.min = min copy } override def reset(): Unit = { max= Int.MinValue //最低气温 min = Int.MaxValue } override def add(v: Int): Unit = { max = math.max(max, v) min = math.min(min, v) } override def merge(other: AccumulatorV2[Int, (Int, Int)]): Unit = { max = math.max(max, other.value._1) min = math.min(min, other.value._2) } override def value: (Int, Int) = { (max,min) } } def main(args: Array[String]): Unit = { //1.创建spark配置对象 val conf = new SparkConf() conf.setAppName("wcApp") conf.setMaster("local[4]") val sc = new SparkContext(conf) val acc = new MyAcc() sc.register(acc , "myacc") val rdd1 = sc.textFile("file:///d:/mr/temp.dat") val rdd2 = rdd1.map(line=>{ val arr = line.split(" ") arr(1).toInt }) rdd2.foreach(temp=>{ acc.add(temp) }) println(acc.value) } } Spark模块 ------------------ 1.core RDD job sparkSQL spark Streaming spark ml spark graphx Spark SQL模块 ------------------ 0.介绍 DataFrame,数据框相当于表 DataFrame。 引入spark-sql依赖。 DataFrame是特殊的DataSet,DataSet[Row]. 0‘.spark操纵hive出错问题 1)不能实例化客户端 a)原因 版本不一致问题。 b)降级hive到hive1.2 c)复制hive jar到spark的jars下 d)或者关闭hive-site.xml schema版本检查 <property> <name>hive.metastore.schema.verification</name> <value>false</value> </property> 1.集成hive 0.说明 spark sql操纵hive,使用spark作为执行引擎。只是从hdfs上读取hive的数据,放到spark上执行。 1.整合步骤 复制hive的hive-site.xml文件到spark conf目录下。 复制mysql的驱动到spark/jars/下 2.启动zk,hdfs。 3.启动spark集群 $>spark/sbin/start-all.sh 4.进入spark-shell $>spark-shell --master spark://s101:7077 $scala>spark.sql("select * from mydb.custs").show() 2.编程实现spark sql的hive访问 2.1)scala版 a)复制hive-site.xml + core-site.xml + hdfs-site.xml到resources目录下 b)添加maven支持 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.it18zhang</groupId> <artifactId>my-spark</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.24</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <!--*************************************************--> <!--****** 注意:一定要引入该依赖,否则hive不好使****--> <!--*************************************************--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> </project> d)编程 val conf = new SparkConf() conf.setAppName("SparkSQLScala") conf.setMaster("local") conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse") //启用hive支持 val sess = SparkSession.builder() .config(conf) .enableHiveSupport() //一定要启用hive支持。 .getOrCreate() import sess._ sess.sql("select * from mydb.custs").show() 2.2)java版 import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * */ public class MySparkSQLJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("MySparkSQLJava"); conf.setMaster("local[*]") ; SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate(); Dataset<Row> df = sess.sql("select * from mydb.custs") ; df.show(); } } 注册RDD成为DataFrame ---------------------- import org.apache.spark.sql.SparkSession /** */ object SparkSQLRDDScala { def main(args: Array[String]): Unit = { //创建spark Session val sess = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate() import sess.implicits._ val rdd1 = sess.sparkContext.textFile("file:///d:/mr/temp.dat") val rdd2 = rdd1.map(line=>{ val arr = line.split(" ") (arr(0).toInt, arr(1).toInt) }) val df1 = rdd2.toDF("year","temp") df1.createOrReplaceTempView("temps") val sql = "select year , max(temp) max ,min(temp) min from temps group by year order by year asc limit 200" sess.sql(sql).show(1000,false) } } Java版RDD和DataFrame之间转换 ----------------------------- package com.oldboy.spark.java; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import scala.tools.nsc.typechecker.StructuredTypeStrings$class; /** * SparkSQL java操作 */ public class SparkSQRDDLava { public static void main(String[] args) { SparkSession sess = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate(); JavaSparkContext sc = new JavaSparkContext(sess.sparkContext()) ; JavaRDD<String> rdd1 = sc.textFile("file:///d:/mr/temp.dat"); //将RDD<Row>类型转换成Dataset<Row> // JavaRDD<Row> rdd2 = rdd1.map(new Function<String, Row>() { // public Row call(String v1) throws Exception { // String[] arr = v1.split(" "); // return RowFactory.create(Integer.parseInt(arr[0]) , Integer.parseInt(arr[1])); // } // }) ; // // //创建结构体 // StructField[] fields = new StructField[2]; // fields[0] = new StructField("year", DataTypes.IntegerType, false, Metadata.empty()); // fields[1] = new StructField("temp", DataTypes.IntegerType, true, Metadata.empty()); // StructType type = new StructType(fields); // // Dataset<Row> df1 = sess.createDataFrame(rdd2 , type) ; // df1.createOrReplaceTempView("temps"); // sess.sql("select * from temps").show(); JavaRDD<TempData> rdd2 = rdd1.map(new Function<String, TempData>() { public TempData call(String v1) throws Exception { String[] arr = v1.split(" "); TempData data = new TempData() ; data.setYear(Integer.parseInt(arr[0])); data.setTemp(Integer.parseInt(arr[1])); return data; } }) ; Dataset<Row> df1 = sess.createDataFrame(rdd2 , TempData.class) ; df1.show(); System.out.println("=================================="); //数据框转成RDD Dataset<Row> df2 = sess.sql("select * from big10.emp2"); JavaRDD<Row> rdd3 = df2.toJavaRDD(); JavaPairRDD<Integer, Float> rdd4 = rdd3.mapToPair(new PairFunction<Row, Integer, Float>() { public Tuple2<Integer, Float> call(Row row) throws Exception { int depno = row.getInt(row.fieldIndex("deptno")) ; float sal= row.getFloat(row.fieldIndex("salary")) ; return new Tuple2<Integer,Float>(depno , sal) ; } }) ; JavaPairRDD<Integer, Float> rdd5 = rdd4.reduceByKey(new Function2<Float, Float, Float>() { public Float call(Float v1, Float v2) throws Exception { return Math.max(v1, v2); } }) ; rdd5.foreach(new VoidFunction<Tuple2<Integer, Float>>() { public void call(Tuple2<Integer, Float> t) throws Exception { System.out.println(t); } }); ; } } spark打印数据结构 --------------------- df.printSchema() spark sql访问json文件 ---------------------------- 1.创建json文件 [d:/java/custs.json] {"id":1,"name":"tom","age":12} {"id":2,"name":"tomas","age":13} {"id":3,"name":"tomasLee","age":14} {"id":4,"name":"tomson","age":15} {"id":5,"name":"tom2","age":16} 2.加载文件[scala] val conf = new SparkConf() conf.setAppName("SparkSQLScala") conf.setMaster("local") conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse") //启用hive支持 val sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() val df = sess.read.json("file:///d:/java/custs.json") df.show(1000,false) 3.[java]版 Dataset<Row> df = spark.read().json("file:///d:/java/custs.json"); 4.保存成json df1.writer.json(path) ; spark sql访问parquet文件 ---------------------------- //保存成parquet df1.writer.parquet(path) ; //读取 spark.read.parquet(path) ; spark sql访问jdbc文件 ---------------------------- //保存成parquet val prop = new java.util.Properties() prop.put("driver" , "com.mysql.jdbc.Driver") prop.put("user" , "root") prop.put("password" , "root") //表不需要存在 df1.writer.jdbc("jdbc:mysql://192.168.231.1:3306/big10" , "emp" ,prop ) ; //读取 spark.read.jdbc("jdbc:mysql://192.168.231.1:3306/big10" , "emp" ,prop ) ; Spark sql DataFrame API编程 ---------------------------- DataFrame.select("id" , "name") DataFrame.select($"id" , $"name") DataFrame.where(" id > 3") DataFrame.groupBy("id").agg(max("age"),min("age")) ; ... spark临时视图 ---------------------------- 1.createOrReplaceTempView 生命周期仅限本session 2.createGlobalTempView 全局,跨session. Spark SQL作为分布式查询引擎 ---------------------------- 1.描述 终端用户/应用程序可以直接同spark sql交互,而不需要写其他代码。 2.启动spark的thrift-server进程 spark/sbin/start-thrift-server --master spark://s101:7077 3.检测 a)webui b)端口 netstat -anop|grep 10000 4.使用spark的beeline程序测试 $>spark/bin/beeline $beeline>!conn jdbc:hive2://s101:10000/mydb $beeline>select * from customers ; Spark Streaming ---------------------------- 持续计算,没有停止。 不是实时计算,小批量计算。 体验流计算 ----------------- 1.编写流计算代码 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2018/5/18. */ object SparkStreamingDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //创建套接字文本流 val lines = ssc.socketTextStream("localhost", 9999) //压扁 val words = lines.flatMap(_.split(" ")) val ds2 = words.map((_,1)) val ds3 = ds2.reduceByKey(_+_) ds3.print() //启动上下文 ssc.start() ssc.awaitTermination() } } 2.启动nc服务器 [win7] nc -l -L -p 9999 3.启动scala程序 /resources/log4j.properties # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console log4j.rootCategory=warn, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to WARN. When running the spark-shell, the # log level for this class is used to overwrite the root logger‘s log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=WARN # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=WARN log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR 4.在nc的终端输入 hello world spark streaming java版 ---------------------------- package com.oldboy.spark.java; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * SparkStreaming java 版 */ public class SparkStreamingJava { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf() ; conf.setAppName("ssc") ; conf.setMaster("local[2]") ; //创建SparkStreaming上下文 JavaStreamingContext ssc = new JavaStreamingContext(conf , Durations.seconds(2)) ; //创建离散流 JavaDStream<String> ds1 = ssc.socketTextStream("s101" , 8888); //压扁 JavaDStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] arr = s.split(" "); return Arrays.asList(arr).iterator(); } }) ; //标1成对 JavaPairDStream<String, Integer> ds3 = ds2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }) ; //聚合 JavaPairDStream<String, Integer> ds4 = ds3.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }) ; // ds4.print(); ds4.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { public void call(JavaPairRDD<String, Integer> rdd) throws Exception { System.out.println("--------------------"); List list = rdd.take(100); for(Object o : list){ System.out.println(o); } } }); ssc.start(); ssc.awaitTermination(); } } SparkStreaming注意事项 ------------------------- 1.上下文启动后,不能再添加新的计算工作 2.上下文停止后,不能重新启动。 3.在一个JVM中,只有一个上下文时活跃的。 4.停止Streaming上下文,可以有选择性的停止SparkContext. 5.SparkContext可以重用创建多个Streaming,前提是上一个需要stop掉。 DStream离散流内部是连续的RDD,DStream的操作转换成对RDD的操作。 离散流和接受者 ------------------------- socket文本流都和Receiver关联,接受者从source接受数据,存放到spark内存用于计算。 源类型 1.基本源 内置 2.高级源 第三方支持 注意事项: 本地执行流计算,不可以local == local[1] ,只有一个线程执行本地任务,需要有一个单独的 线程运行接受者,没有线程执行计算工作。 Socket sock = new Socket("localhost" , 8888) ; Spark Streaming API ---------------------- 1.StreamingContext spark流计算入口点,创建离散流,可以使用多种方式创建。 start()/stop()/awaitTermination(); 2.SocketReceiver 创建Socket对象,孵化分线程,接受数据,存储到内存中。 3.ReceiverTracker 接受者跟踪器,管理接收器的执行。 4.JobSchduler 调度job在spark上执行,使用JobGenerator生成器生成job,并使用线程池执行他们。 使用spark sql实现taggen(scala版) --------------------------------- import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** */ object MySparkSQLScalaTaggen { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("SparkSQLScala") conf.setMaster("local") conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse") //启用hive支持 val sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //1.加载文件形成rdd val rdd1 = sess.sparkContext.textFile("file:///d:/temptags.txt") //2.变换过滤,去除无效行 val rdd2 = rdd1.map(_.split("\t")).filter(_.length > 1) //3.json解析 val rdd3 = rdd2.map(arr=>(arr(0) , JSONUtil.parseTag(arr(1)))) //4.过滤,去除空评论 val rdd4 = rdd3.filter(t=>t._2.size() > 0) // val rdd44 = rdd4.map(t=>{ val busid = t._1 val list = t._2 var arr = new Array[String](list.size()) var i:Int = 0 import scala.collection.JavaConversions._ for(x <- list){ arr(i) = x i +=1 } (busid , arr) }) //5.变换rdd成为rdd[Row] val rdd5 = rdd44.map(t=>{ Row(t._1,t._2) }) //6.数据结构定义 val mytype = StructType(List( StructField("busid" , DataTypes.StringType,false) , StructField("tags" , DataTypes.createArrayType(DataTypes.StringType),false) )) //7.创建数据框 val df = sess.createDataFrame(rdd5, mytype) //8.注册临时表 df.createOrReplaceTempView("_tags") //9.炸开tags字段 //val df2 = sess.sql("select busid , explode(tags) tag from _tags") //OK //使用hive的横向视图完成炸裂数据的组合 val df2 = sess.sql("select busid , tag from _tags lateral view explode(tags) xx as tag") //10.注册临时表 df2.createOrReplaceTempView("_tags2") //11.统计每个商家每条评论的个数. val sql1 = "select busid, tag , count(*) cnt from _tags2 group by busid , tag order by busid , cnt desc" ; //12.聚合每个商家的所有评论,busid, List((tag,count),...,涉及子查询 //val sql2 = "select t.busid , collect_list(struct(t.tag , t.cnt)) st from (" + sql1 + ") as t group by t.busid order by st[0].col2 desc " val sql2 = "select t.busid , collect_list(named_struct(‘tag‘ , t.tag , ‘cnt‘ , t.cnt)) st from (" + sql1 + ") as t group by t.busid order by st[0].cnt desc " sess.sql(sql2).show(10000, false) // val sql2 = "select t.busid , collect_list(named_struct(‘tag‘ , t.tag , ‘cnt‘ , t.cnt)) st from (" + sql1 + ") as t group by t.busid " // sess.sql(sql2).createOrReplaceTempView("_tags3") // //13.对所有商家按照评论个数的最大值进行倒排序 // val sql3 = "select * from _tags3 order by st[0].cnt desc" // sess.sql(sql3).show(10000,false) } }
以上是关于spark的累加器-SQL-Streaming的主要内容,如果未能解决你的问题,请参考以下文章