创建spark_读取数据
Posted 彩色条纹小斑马
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了创建spark_读取数据相关的知识,希望对你有一定的参考价值。
在2.0版本之前,使用Spark必须先创建SparkConf和SparkContext,不过在Spark2.0中只要创建一个SparkSession就够了,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中。
在与spark2.0交互之前必须先创建spark对象
val Spark = SparkSession .builder() .master(masterUrl) .appName(appName) .config("spark.some.config.option", "some-value") .getOrCreate()
当创建好了SparkSession,我们就可以配置Spark运行相关属性。比如下面代码片段我们修改了已经存在的运行配置选项。
spark.conf.set("spark.sql.shuffle.partitions", 6) spark.conf.set("spark.executor.memory", "2g")
绝大多数的属性控制应用程序的内部设置,并且默认值都是比较合理的。下面对这些属性进行说明:
spark.app.name
该属性没有默认值,它的含义是你的应用程序的名字,这个名字设定之后将会在WEB UI上和日志数据里面显示。如果这个属性没有设置的话,将会把你应用程序的main函数所在类的全名作为应用程序的名称。在Yarn环境下,还可以用--name或者SPARK_YARN_APP_NAME来设置应用程序的名称。为了能够方便地查看各个应用程序的含义,取一个好的名字是很重要的。
spark.master
该属性没有默认值。这是Spark程序需要连接的集群管理器所在的URL地址。当前的spark支持三种集群方式Standalone、Apache Mesos以及YARN模式。如果这个属性在提交应用程序的时候没设置,程序将会通过System.getenv("MASTER")来获取MASTER环境变量;但是如果MASTER环境变量没有设定,那么程序将会把master的值设定为local[*],之后程序将在本地启动。
spark.executor.memory
该属性的默认值是512m。每个executor处理器可以使用的内存大小之和,跟JVM的内存表示的字符串格式是一样的(比如: ‘512m‘,‘2g‘)。在早期版本的Spark,是通过-Xmx和-Xms来设置的。如果这个值没有设定,那么程序将会先获取SPARK_EXECUTOR_MEMORY环境变量;如果还没设置,那么获取SPARK_MEM环境变量的值;如果这个值也没设定,那么这个值将会别设定为512,。
几乎所有运行时性能相关的内容都或多或少间接和内存大小相关。这个参数最终会被设置到Executor的JVM的heap尺寸上,对应的就是Xmx和Xms的值。
spark.serializer
默认值是org.apache.spark.serializer.JavaSerializer。用于序列化网络传输或者以序列化形式缓存起来的各种对象的类。默认的Serializer可以对所有的Java对象进行序列化,但是它的速度十分慢!所以如果速度是影响程序运行的关键,你可以将该值设定为org.apache.spark.serializer.KryoSerializer。在一些情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,但是相对于JavaSerializer而言,主要的问题是它不能支持所有的Java对象。当然,用户可以直接继承org.apache.spark.serializer来实现自己的Serializer。
spark.kryo.registrator
默认值为空。如果你使用了KryoSerializer,就要为Kryo设置这个类去注册你自定义的类,该类必须继承自KryoRegistrator,实现其中的registerClasses(kryo: Kryo)即可。
spark.local.dir
默认值为/tmp。用于设定Spark的缓存目录,包括了mapper输出的文件,缓存到磁盘的RDD数据。最好将这个属性设定为访问速度快的本地磁盘。同Hadoop一样,可以用逗号分割来设定多个不同磁盘的目录。需要注意,在Spark 1.0和之后的版本,这个属性将会被SPARK_LOCAL_DIRS (Standalone, Mesos) 或者 LOCAL_DIRS (YARN) 环境变量替代。
spark.logConf
默认值是false。当SparkContext启动的时候,以INFO日志级别记录下有效的SparkConf 。
以上参考文档:https://www.iteblog.com/archives/1143.html#sparkappname
当创建好sparksession后,就可以读取数据了
用一个map来存储读取文件的格式
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs:")
再读取hdfs上的数据
val data all=spark.sqlContext.read.options(options).format("com.databricks.spark.csv").load() 或者 val data all=spark.sqlContext.read.options(options).csv(filepath)
存储数据在hdfs上
val saveOption = Map("header" -> "true", "delimiter" -> "\t", "path" -> path) data.repartition(1).write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOption).save()
读取的数据创建临时表格
data.createOrReplaceTempView("groupData")
可用sparksession.sql对数据进行字段提取,处理
val result = spark.sql("select IP,sum(COUNT) COUNT from groupData group by IP")
以上是关于创建spark_读取数据的主要内容,如果未能解决你的问题,请参考以下文章
如何从 BigQuery 读取分区表到 Spark 数据帧(在 PySpark 中)
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段