PySpark与GraphFrames的安装与使用

Posted 小小明-代码实体

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark与GraphFrames的安装与使用相关的知识,希望对你有一定的参考价值。

PySpark环境搭建

配置hadoop

spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误。这是因为spark需要使用到Hadoop的winutils和hadoop.dll,首先我们必须配置好Hadoop相关的环境。可以到github下载:https://github.com/4ttty/winutils

gitcode提供了镜像加速:https://gitcode.net/mirrors/4ttty/winutils

我选择了使用这个仓库提供的最高的Hadoop版本3.0.0将其解压到D:\\deploy\\hadoop-3.0.0目录下,然后配置环境变量:

我们还需要将对应的hadoop.dll复制到系统中,用命令表达就是:

copy D:\\deploy\\hadoop-3.0.0\\bin\\hadoop.dll C:\\Windows\\System32

不过这步需要拥有管理员权限才可以操作。

为了能够在任何地方使用winutils命令工具,将%HADOOP_HOME%\\bin目录加入环境变量中:

安装pyspark与Java

首先,我们安装spark当前(2022-2-17)的最新版本:

pip install pyspark==3.2.1

需要注意pyspark的版本决定了jdk的最高版本,例如假如安装2.4.5版本的pyspark就只能安装1.8版本的jdk,否则会报出java.lang.IllegalArgumentException: Unsupported class file major version 55的错误。

这是因为pyspark内置了Scala,而Scala是基于jvm的编程语言,Scala与jdk的版本存在兼容性问题,JDK与scala的版本兼容性表:

JDK versionMinimum Scala versionsRecommended Scala versions
172.13.6, 2.12.15 (forthcoming)2.13.6, 2.12.15 (forthcoming)
162.13.5, 2.12.142.13.6, 2.12.14
13, 14, 152.13.2, 2.12.112.13.6, 2.12.14
122.13.1, 2.12.92.13.6, 2.12.14
112.13.0, 2.12.4, 2.11.122.13.6, 2.12.14, 2.11.12
82.13.0, 2.12.0, 2.11.0, 2.10.22.13.6, 2.12.14, 2.11.12, 2.10.7
6, 72.11.0, 2.10.02.11.12, 2.10.7

当前3.2.1版本的pyspark内置的Scala版本为2.12.15,意味着jdk17与其以下的所有版本都支持。

这里我依然选择安装jdk8的版本:

测试一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

jdk11的详细安装教程(jdk1.8在官网只有安装包,无zip绿化压缩包):

绿化版Java11的环境配置与Python调用Java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安装

pip安装当前最新的graphframes:

pip install graphframes==0.6

然后在官网下载graphframes的jar包。

下载地址:https://spark-packages.org/package/graphframes/graphframes

由于安装的pyspark版本是3.2,所以这里我选择了这个jar包:

然后将该jar包放入pyspark安装目录的jars目录下:

pyspark安装位置可以通过pip查看:

C:\\Users\\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\\miniconda3\\lib\\site-packages
Requires: py4j
Required-by:

使用方法

学习pyspark的最佳路径是官网:https://spark.apache.org/docs/latest/quick-start.html

在下面的网页,官方提供了在线jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

启动spark并读取数据

本地模式启动spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \\
    .builder \\
    .appName("Python Spark") \\
    .master("local[*]") \\
    .getOrCreate()
sc = spark.sparkContext
spark

SparkSession输出的内容中包含了spark的web页面,新标签页打开页面后大致效果如上。

点击Environment选项卡可以查看当前环境中的变量:

启动hive支持

找到pyspark的安装位置,例如我的电脑在D:\\Miniconda3\\Lib\\site-packages\\pyspark

手动创建conf目录并将hive-site.xml配置文件复制到其中。如果hive使用了mysql作为原数据库,则还需要将MySQL对应的驱动jar包放入spark的jars目录下。

创建spark会话对象时可通过enableHiveSupport()开启hive支持:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \\
    .builder \\
    .appName("Python Spark SQL Hive integration example") \\
    .enableHiveSupport() \\
    .getOrCreate()
sc = spark.sparkContext
spark

spark访问hive自己创建的表有可能会出现如下的权限报错:

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------

是因为当前用户不具备对\\tmp\\hive的操作权限:

>winutils ls \\tmp\\hive
drwx------ 1 BUILTIN\\Administrators XIAOXIAOMING\\None 0 May  4 2020 \\tmp\\hive

把\\tmp\\hive目录的权限改为777即可顺利访问:

>winutils chmod 777 \\tmp\\hive

>winutils ls \\tmp\\hive
drwxrwxrwx 1 BUILTIN\\Administrators XIAOXIAOMING\\None 0 May  4 2020 \\tmp\\hive

Spark的DataFrame与RDD

从spark2.x开始将RDD和DataFrame的API统一抽象成dataset,DataFrame就是Dataset[Row],RDD则是Dataset.rdd。可以将DataFrame理解为包含结构化信息的RDD。

将含row的RDD转换为DataFrame只需要调用toDF方法或SparkSession的createDataFrame方法即可,也可以传入schema覆盖类型或名称设置。

DataFrame的基础api

DataFrame默认支持DSL风格语法,例如:

//查看DataFrame中的内容
df.show()

//查看DataFrame部分列中的内容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()

//打印DataFrame的Schema信息
df.printSchema()

//过滤age大于等于 21 的
df.filter(df['age'] > 21).show()

//按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show()

将DataFrame注册成表或视图之后即可进行纯SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查询年龄最大的前两名
spark.sql("select * from t_person order by age desc limit 2").show()
//显示表的Schema信息
spark.sql("desc t_person").show()

Pyspark可以直接很方便的注册udf并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())

执行结果:

[Row(length='4')]
[Row(length='3')]

RDD的简介

DataFrame的本质是对RDD的包装,可以理解为DataFrame=RDD[Row]+schema。

RDD(A Resilient Distributed Dataset)叫做弹性可伸缩分布式数据集,是Spark中最基本的数据抽象。它代表一个不可变、自动容错、可伸缩性、可分区、里面的元素可并行计算的集合。

在每一个RDD内部具有五大属性:

  1. 具有一系列的分区
  2. 一个计算函数操作于每一个切片
  3. 具有一个对其他RDD的依赖列表
  4. 对于 key-value RDDs具有一个Partitioner分区器
  5. 存储每一个切片最佳计算位置

一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

**一个计算每个分区的函数。**Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

**RDD之间的依赖关系。**RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

**一个Partitioner,即RDD的分片函数。**当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

**一个列表,存储存取每个Partition的优先位置(preferred location)。**对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD的API概览

RDD包含Transformation APIAction API,Transformation API都是延迟加载的只是记住这些应用到基础数据集上的转换动作,只有当执行Action API时这些转换才会真正运行。

Transformation API产生的两类RDD最重要,分别是MapPartitionsRDDShuffledRDD

产生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是mapflatMap,但任何产生MapPartitionsRDD的算子都可以直接使用mapPartitionsmapPartitionsWithIndex实现。

产生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。

combineByKey到groupByKey 底层均是调用combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope 
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))


def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope 
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)


def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope 
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]

三个重要参数的含义:

  • createCombiner:根据每个分区的第一个元素操作产生一个初始值
  • mergeValue:对每个分区内部的元素进行迭代合并
  • mergeCombiners:对所有分区的合并结果进行合并

groupByKey的partitioner未指定时会传入默认的defaultPartitioner。例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))

aggregateByKey:每个分区使用zeroValue作为初始值,迭代每一个元素用seqOp进行合并,对所有分区的结果用combOp进行合并。例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

reduceByKey :每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect

Action API有:

动作含义
reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回RDD的元素个数
first()返回RDD的第一个元素(类似于take(1))
take(n)返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement*,*num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])排序并取前N个元素
saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)将RDD中的元素用NullWritable作为key,实际元素作为value保存为sequencefile格式
countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func)在数据集的每一个元素上,运行函数func进行更新。

spark模拟实现mapreduce版wordcount:

object MapreduceWordcount 
  def main(args: Array[String]): Unit = 
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")
    
    import org.apache.hadoop.io.LongWritable, Text
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer
    
    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = 
      for (word <- v.toString.split("\\\\s+"))
        collect += ((word, 1))
    
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = 
      collect += ((key, value.sum))
    
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      )
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => 
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) 
          if (!currKey.equals(lastKey)) 
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          
          values += value
          lastKey = currKey
        
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      )
    rdd.foreach(println)
  

各类RDD

  • ShuffledRDD :表示需要走Shuffle过程的网络传输

  • CoalescedRDD :用于将一台机器的多个分区合并成一个分区

  • CartesianRDD :对两个RDD的所有元素产生笛卡尔积

  • MapPartitionsRDD :用于对每个分区的数据进行特定的处理

  • CoGroupedRDD :用于将2~4个rdd,按照key进行连接聚合

  • SubtractedRDD :用于对2个RDD求差集

  • UnionRDDPartitionerAwareUnionRDD :用于对2个RDD求并集

  • ZippedPartitionsRDD2:zip拉链操作产生的RDD

  • ZippedWithIndexRDD:给每一个元素标记一个自增编号

  • PartitionwiseSampledRDD:用于对rdd的元素按照指定的百分比进行随机采样

当我们需要给Datafream添加自增列时,可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()

API用法详情可参考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

cache&checkpoint

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

rdd.persist()

checkpoint的源码注释可以看到:

  • 标记该RDD作为检查点。
  • 它将被保存在通过SparkContext#setCheckpointDir方法设置的检查点目录中
  • 它所引用的所有父RDD引用将全部被移除
  • 这个方法在这个RDD上必须在所有job执行前运行。
  • 强烈建议将这个RDD缓存在内存中,否则这个保存文件的计算任务将重新计算。

从中我们得知,在执行checkpoint方法时,最好同时,将该RDD缓存起来,否则,checkpoint也会产生一个计算任务。

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()

graphframes 的用法

GraphFrame是将Spark中的Graph算法统一到DataFrame接口的Graph操作接口,为Scala、Java和Python提供了统一的图处理API。

Graphframes是开源项目,源码工程如下:https://github.com/graphframes/graphframes

可以参考:

在GraphFrames中图的顶点(Vertex)和边(edge)都是以DataFrame形式存储的:

  • 顶点DataFrame:必须包含列名为“id”的列,用于作为顶点的唯一标识
  • 边DataFrame:必须包含列名为“src”和“dst”的列,根据唯一标识id标识关系

创建图的示例:

from graphframes import GraphFrame

vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])

edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])

# 生成图
g = GraphFrame(vertices, edges)

GraphFrame提供三种视图:

print("顶点表视图:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("边表视图:")
graph.以上是关于PySpark与GraphFrames的安装与使用的主要内容,如果未能解决你的问题,请参考以下文章

PySpark求解连通图问题

GraphFrames 的 PySpark 异常

如何使用 Pyspark 中的 Graphframes 和 Spark Dataframe 中的原始数据获取连接的组件?

找不到模块'graphframes'——Jupyter

GraphFrames的PageRank错误

如何将数据从 PySpark 持久化到 Hive - 避免重复