Apache Spark 简介:统一分析引擎
Posted Sonhhxg_柒
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark 简介:统一分析引擎相关的知识,希望对你有一定的参考价值。
🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎
📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃
🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝
📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】 深度学习【DL】
🖍foreword
✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。
如果你对这个系列感兴趣的话,可以关注订阅哟👋
文章目录
本章列出了 Apache Spark 的起源及其基本理念。它还调查了项目的主要组件及其分布式架构。如果您熟悉 Spark 的历史和高级概念,可以跳过本章。
Spark的起源
在本节中,我们将绘制路线图Apache Spark 的简短演变:它的起源、灵感和在社区中作为事实上的大数据统一处理引擎的采用。
还剩 9 小时 38 米第 1 章Apache Spark 简介:统一分析引擎
本章列出了 Apache Spark 的起源及其基本理念。它还调查了项目的主要组件及其分布式架构。如果您熟悉 Spark 的历史和高级概念,可以跳过本章。
火花的起源
在本节中,我们将绘制路线图Apache Spark 的简短演变:它的起源、灵感和在社区中作为事实上的大数据统一处理引擎的采用。
Google 的大数据和分布式计算
当我们想到规模时,我们不禁想到谷歌搜索引擎以闪电般的速度索引和搜索互联网上的世界数据的能力。谷歌这个名字是规模的代名词。事实上,Google 是对数学术语googol的故意拼写错误:即 1 加 100 个零!
无论是传统的存储系统,如关系数据库管理系统 (RDBMS) 和命令式编程方式都无法处理 Google 想要构建和搜索互联网索引文档的规模。由此产生的对新方法的需求导致创建了 Google 文件系统(GFS)、 MapReduce (MR)和Bigtable。
虽然 GFS 为集群场中的许多商品硬件服务器提供了容错和分布式文件系统,但 Bigtable 提供了跨 GFS的结构化数据的可扩展存储。MR 引入了一种基于函数式编程的新并行编程范式,用于大规模处理分布在 GFS 和 Bigtable 上的数据。
本质上,您的 MR 应用程序与MapReduce 系统交互,该系统将计算代码(映射和归约函数)发送到数据所在的位置,有利于数据局部性和集群机架亲和力,而不是将数据带到您的应用程序。
集群中的工作人员聚合并减少中间计算,并从 reduce 函数生成最终附加输出,然后将其写入分布式存储,您的应用程序可以访问它。这种方法显着减少了网络流量,并将大部分输入/输出 (I/O) 保留在磁盘本地,而不是通过网络分发。
谷歌所做的大部分工作都是专有的,但上述三篇论文中表达的想法激发了开源社区其他地方的创新想法——尤其是在雅虎,它正在为其搜索引擎处理类似的大规模大数据挑战。
雅虎的 Hadoop!
Google 的 GFS 论文中表达的计算挑战和解决方案为Hadoop 文件系统 (HDFS)提供了蓝图,包括作为分布式计算框架的 MapReduce 实现。它于 2006 年 4 月捐赠给供应商中立的非营利组织Apache 软件基金会 (ASF),成为Apache Hadoop框架相关模块的一部分:Hadoop Common、MapReduce、HDFS 和 Apache Hadoop YARN。
尽管 Apache Hadoop 在 Yahoo! 之外获得了广泛采用,激发了一个大型开源社区的贡献者和两家基于开源的商业公司(Cloudera 和 Hortonworks,现已合并),但 HDFS 上的 MapReduce 框架仍有一些缺点。
一是管理难度大,操作复杂,操作繁琐。其次,它的通用批处理 MapReduce API 很冗长,需要大量的样板设置代码,而且很脆弱容错性。第三,大批量的数据作业,多对MR任务,每对的中间计算结果写入本地磁盘,用于后续阶段的操作(见图1-1)。磁盘 I/O 的这种重复性能造成了损失:大型 MR 作业可能会连续运行数小时甚至数天。
图 1-1。map 和 reduce计算之间的读写间歇迭代
最后,尽管 Hadoop MR 有利于用于一般批处理的大规模作业,但它在结合其他工作负载(例如机器学习、流式传输或交互式 SQL 类查询)方面却存在不足。
为了处理这些新的工作负载,工程师开发了定制系统(Apache Hive、Apache Storm、Apache Impala、Apache Giraph、Apache Drill、Apache Mahout 等),每个系统都有自己的 API 和集群配置,进一步增加了 Hadoop 的操作复杂性以及开发人员陡峭的学习曲线。
然后问题就变成了(记住 Alan Kay 的格言,“简单的事情应该简单,复杂的事情应该成为可能”),有没有办法让 Hadoop 和 MR 更简单、更快?
Spark 在 AMPLab 的早期经历
加州大学伯克利分校的研究人员之前曾研究过 Hadoop MapReduce,他们通过一个名为Spark的项目来应对这一挑战。他们承认 MR 对于交互式或迭代计算工作和复杂的学习框架效率低下(或难以处理),因此从一开始他们就接受了让 Spark 更简单、更快、更容易的想法。这项工作于 2009 年在 RAD 实验室开始,该实验室后来成为 AMPLab(现在被称为 RISELab)。
在 Spark 上发表的早期论文表明,对于某些工作,它比 Hadoop MapReduce 快 10 到 20 倍。今天,它快了许多数量级。Spark 项目的核心目标是引入从 Hadoop MapReduce 借鉴的想法,但要增强系统:使其具有高度容错性和令人尴尬的并行性,支持在内存中存储迭代和交互式映射之间的中间结果并减少计算,提供多种语言的简单和可组合的 API 作为编程模型,并以统一的方式支持其他工作负载。我们很快就会回到统一的想法,因为它是 Spark 中的一个重要主题。
到 2013 年,Spark 得到了广泛的使用,它的一些原始创建者和研究人员——Matei Zaharia、Ali Ghodsi、Reynold Xin、Patrick Wendell、Ion Stoica 和 Andy Konwinski——将 Spark 项目捐赠给了 ASF,并成立了一家名为 Databricks 的公司。
在 ASF 的治理下, Databricks 和开源开发者社区于 2014 年 5 月共同发布了Apache Spark 1.0。第一个主要版本为未来频繁发布以及 Databricks 和 100 多家商业供应商为 Apache Spark 贡献显着特性奠定了基础。
什么是 Apache Spark?
Apache Spark 是为大规模分布式数据处理而设计的统一引擎,可以在数据中心或云端进行。
Spark 为中间计算提供内存存储,使其比 Hadoop MapReduce 快得多。它包含用于机器学习 (MLlib)、用于交互式查询的 SQL (Spark SQL)、用于与实时数据交互的流处理 (Structured Streaming) 和图形处理 (GraphX) 的库。
-
速度
-
使用方便
-
模块化
-
可扩展性
让我们看看这对框架意味着什么。
速度
Spark 以多种方式追求速度的目标。首先,它的内部实现极大地受益于硬件行业最近在提高 CPU 和内存的价格和性能方面取得的巨大进步。今天的商品服务器价格便宜,具有数百 GB 的内存、多核以及利用高效多线程和并行处理的基于 Unix 的底层操作系统。该框架经过优化以利用所有这些因素。
其次,Spark 将其查询计算构建为有向无环图(DAG);它的 DAG 调度器和查询优化器构建了一个高效的计算图,通常可以将其分解为在集群上的工作人员之间并行执行的任务。第三,它的物理执行引擎 Tungsten 使用全阶段代码生成来生成紧凑的执行代码(我们将在第 3 章介绍 SQL 优化和全阶段代码生成)。
由于所有中间结果都保留在内存中,并且磁盘 I/O 有限,这给它带来了巨大的性能提升。
使用方便
Spark 通过提供称为弹性分布式数据集 (RDD) 的简单逻辑数据结构的基本抽象来实现简单性,在此基础上构建所有其他更高级别的结构化数据抽象,例如 DataFrames 和 Datasets。通过提供一组转换和操作作为操作,Spark 提供了一个简单的编程模型,您可以使用它以熟悉的语言构建大数据应用程序。
模块化
Spark 操作可以应用于多种类型的工作负载,并以任何受支持的编程语言表示:Scala、Java、Python、SQL 和 R。Spark 提供了具有良好记录 API 的统一库,其中包括以下模块作为核心组件:Spark SQL、Spark Structured Streaming、Spark MLlib 和 GraphX,将在一个引擎下运行的所有工作负载结合在一起。我们将在下一节中仔细研究所有这些。
您可以编写一个可以完成所有工作的 Spark 应用程序——无需针对不同工作负载使用不同的引擎,无需学习单独的 API。使用 Spark,您可以获得适用于工作负载的统一处理引擎。
可扩展性
Spark 专注于其快速的并行计算引擎,而不是存储。与同时包含存储和计算的 Apache Hadoop 不同,Spark 将两者分离。这意味着您可以使用 Spark 读取存储在无数来源中的数据——Apache Hadoop、Apache Cassandra、Apache HBase、MongoDB、Apache Hive、RDBMS 等,并在内存中进行处理。Spark 的DataFrameReader
s 和DataFrameWriter
s 还可以扩展以将来自其他来源的数据(例如 Apache Kafka、Kinesis、Azure Storage 和 Amazon S3)读取到其可以操作的逻辑数据抽象中。
Spark 开发者社区维护着一份第三方 Spark 包列表,作为不断发展的生态系统的一部分(见图 1-2)。这个丰富的软件包生态系统包括用于各种外部数据源、性能监视器等的 Spark 连接器。
图 1-2。Apache Spark 的连接器生态系统
统一分析
虽然统一的概念并不是 Spark 独有的,但它是其设计理念和演变的核心组成部分。2016 年 11 月,计算机协会 (ACM) 认可了 Apache Spark,并授予其原始创建者著名的 ACM 奖,因为他们将Apache Spark 描述为“大数据处理的统一引擎”。获奖论文指出,Spark 用统一的组件堆栈取代了所有单独的批处理、图形、流和查询引擎,如 Storm、Impala、Dremel、Pregel 等,这些组件在单个分布式快速引擎下处理各种工作负载。
Apache Spark 组件作为统一堆栈
如图 1-3所示,Spark 提供了四个不同的组件作为不同工作负载的库:Spark SQL、Spark MLlib、Spark Structured Streaming 和 GraphX。这些组件中的每一个都与 Spark 的核心容错引擎分开,因为您使用 API 编写 Spark 应用程序,Spark 将其转换为由核心引擎执行的 DAG。因此,无论您使用Java、R、Scala、SQL 或 Python提供的结构化 API(我们将在第 3 章中介绍)编写 Spark 代码,底层代码都被分解为高度紧凑的在整个集群的工作人员 JVM 中执行的字节码。
图 1-3。Apache Spark 组件和 API 堆栈
让我们更详细地看看这些组件中的每一个。
Spark SQL
该模块适用于结构化数据。您可以读取存储在 RDBMS 表中的数据或从具有结构化数据(CSV、文本、JSON、Avro、ORC、Parquet 等)的文件格式中读取的数据,然后在 Spark 中构建永久或临时表。此外,在 Java、Python、Scala 或 R 中使用 Spark 的结构化 API 时,您可以组合类似 SQL 的查询来查询刚刚读入 Spark DataFrame 的数据。迄今为止,Spark SQL 是符合 ANSI SQL:2003 标准的,它还可以用作纯 SQL 引擎。
例如,在这个 Scala 代码片段中,您可以读取存储在 Amazon S3 上的 JSON 文件,创建一个临时表,并对作为 Spark DataFrame 读入内存的结果发出类似 SQL 的查询:
// In Scala
// Read data off Amazon S3 bucket into a Spark DataFrame
spark.read.json("s3://apache_spark/data/committers.json")
.createOrReplaceTempView("committers")
// Issue a SQL query and return the result as a Spark DataFrame
val results = spark.sql("""SELECT name, org, module, release, num_commits
FROM committers WHERE module = 'mllib' AND num_commits > 10
ORDER BY num_commits DESC""")
您可以在 Python、R 或 Java 中编写类似的代码片段,生成的字节码将是相同的,从而产生相同的性能。
Spark MLlib
Spark 附带一个包含通用机器学习 (ML) 算法的库,称为 MLlib。自 Spark 首次发布以来,由于 Spark 2.x 的底层引擎增强,此库组件的性能得到了显着提升。MLlib 提供了许多流行的机器学习算法,这些算法构建在基于 DataFrame 的高级 API 之上以构建模型。
笔记
从 Apache Spark 1.6 开始,MLlib 项目分为两个包:
spark.mllib
和spark.ml
. 基于 DataFrame 的 API 是后者,而前者包含基于 RDD 的 API,现在处于维护模式。所有新功能都进入spark.ml
. 本书将“MLlib”称为 Apache Spark 中机器学习的总括库。
这些 API 允许您在部署期间提取或转换特征、构建管道(用于训练和评估)以及持久化模型(用于保存和重新加载它们)。其他实用程序包括使用常见的线性代数运算和统计。MLlib 包括其他低级 ML 原语,包括通用梯度下降优化。以下 Python 代码片段封装了数据科学家在构建模型时可能执行的基本操作(更广泛的示例将在第10章和第11章中讨论)
# In Python
from pyspark.ml.classification import LogisticRegression
...
training = spark.read.csv("s3://...")
test = spark.read.csv("s3://...")
# Load training data
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Predict
lrModel.transform(test)
...
Spark 结构化流
Apache Spark 2.0 引入了一个实验性的连续流模型和结构化流 API,构建在 Spark SQL 引擎和基于 DataFrame 的 API 之上。通过 Spark 2.2,Structured Streaming 普遍可用,这意味着开发人员可以在他们的生产环境中使用它。
大数据开发人员需要对来自 Apache Kafka 和其他流式源等引擎的静态数据和流式数据进行实时组合和反应,新模型将流视为一个不断增长的表,并在末尾附加新的数据行. 开发人员可以仅将其视为结构化表,并像处理静态表一样对其发出查询。
在结构化流模型下,Spark SQL 核心引擎处理容错和后期数据语义的所有方面,使开发人员能够相对轻松地专注于编写流应用程序。这个新模型摒弃了 Spark 1.x 系列中的旧 DStreams 模型,我们将在第 8 章中详细讨论。此外,Spark 2.x 和 Spark 3.0 扩展了流数据源的范围,包括 Apache Kafka、Kinesis 和基于 HDFS 的存储或云存储。
以下代码片段显示了结构化流应用程序的典型结构。它从 localhost 套接字读取并将字数统计结果写入 Apache Kafka 主题:
# In Python
# Read a stream from a local host
from pyspark.sql.functions import explode, split
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
# Perform transformation
# Split the lines into words
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# Generate running word count
word_counts = words.groupBy("word").count()
# Write out to the stream to Kafka
query = (word_counts
.writeStream
.format("kafka")
.option("topic", "output"))
GraphX
顾名思义,GraphX 是一个用于操作图(例如,社交网络图、路线和连接点,或网络拓扑图)和执行图并行计算的库。它提供了由社区用户贡献的用于分析、连接和遍历的标准图算法:可用的算法包括 PageRank、Connected Components 和 Triangle Counting。1
此代码片段显示了如何使用 GraphX API 连接两个图的简单示例:
// In Scala
val graph = Graph(vertices, edges)
messages = spark.textFile("hdfs://...")
val graph2 = graph.joinVertices(messages)
(id, vertex, msg) => ...
Apache Spark 的分布式执行
如果您已经阅读了本文,您已经知道 Spark 是一个分布式数据处理引擎,其组件在机器集群上协同工作。在本书后续章节中探讨 Spark 编程之前,您需要了解 Spark 分布式架构的所有组件如何协同工作和通信,以及可用的部署模式。
让我们先来看看图 1-4中所示的每个单独的组件,以及它们是如何融入架构的。在 Spark 架构的较高层次上,Spark 应用程序由一个驱动程序组成,该驱动程序负责在 Spark 集群上编排并行操作。驱动程序通过一个SparkSession
.
图 1-4。Apache Spark 组件和架构
Spark driver
作为 Spark 应用程序中负责实例化 a 的部分SparkSession
,Spark 驱动程序具有多个角色:它与集群管理器通信;它从集群管理器为 Spark 的执行器(JVM)请求资源(CPU、内存等);它将所有 Spark 操作转换为 DAG 计算,调度它们,并将它们的执行作为任务分配给 Spark 执行器。一旦分配了资源,它就直接与执行者进行通信。
SparkSession
在 Spark 2.0 中,它SparkSession
成为所有 Spark 操作和数据的统一管道。它不仅包含了以前的 Spark 入口点,如SparkContext
、SQLContext
、HiveContext
、SparkConf
和StreamingContext
,而且还使使用 Spark 变得更简单、更容易。
笔记
尽管在 Spark 2.x 中
SparkSession
包含所有其他上下文,但您仍然可以访问各个上下文及其各自的方法。通过这种方式,社区保持了向后兼容性。也就是说,您的旧 1.x 代码与SparkContext
orSQLContext
仍然可以工作。
通过这一管道,您可以创建 JVM 运行时参数、定义数据帧和数据集、从数据源读取数据、访问目录元数据以及发出 Spark SQL 查询。SparkSession
为 Spark 的所有功能提供一个统一的入口点。
在独立的 Spark 应用程序中,您可以SparkSession
使用您选择的编程语言中的一种高级 API 创建一个。在 Spark shell(下一章会详细介绍)中SparkSession
为您创建了它,您可以通过一个名为spark
or的全局变量来访问它sc
。
而在 Spark 1.x 中,您必须创建单独的上下文(用于流、SQL 等),引入额外的样板代码,在 Spark 2.x 应用程序中,您可以创建SparkSession
每个 JVM 并使用它来执行许多火花操作。
我们来看一个例子:
// In Scala
import org.apache.spark.sql.SparkSession
// Build SparkSession
val spark = SparkSession
.builder
.appName("LearnSpark")
.config("spark.sql.shuffle.partitions", 6)
.getOrCreate()
...
// Use the session to read JSON
val people = spark.read.json("...")
...
// Use the session to issue a SQL query
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")
集群管理器
集群管理器负责为运行 Spark 应用程序的节点集群管理和分配资源。目前,Spark 支持四种集群管理器:内置的独立集群管理器、Apache Hadoop YARN、Apache Mesos 和 Kubernetes。
Spark执行器
Spark 执行器在集群中的每个工作节点上运行。执行器与驱动程序通信并负责在工作人员上执行任务。在大多数部署模式中,每个节点只运行一个执行器。
部署模式
Spark 的一个吸引人的特性是它支持多种部署模式,使 Spark 能够在不同的配置和环境中运行。因为集群管理器不知道它的运行位置(只要它可以管理 Spark 的执行器并满足资源请求), Spark 可以部署在一些最流行的环境中——例如 Apache Hadoop YARN 和 Kubernetes——并且可以在不同的环境中运行模式. 表 1-1总结了可用的部署模式.
模式 | Spark driver | Spark执行器 | 集群管理器 |
---|---|---|---|
当地的 | 在单个 JVM 上运行,例如笔记本电脑或单个节点 | 与驱动程序在同一 JVM 上运行 | 在同一主机上运行 |
独立 | 可以在集群中的任何节点上运行 | 集群中的每个节点都会启动自己的 executor JVM | 可以任意分配给集群中的任意主机 |
纱线(客户端) | 在客户端上运行,而不是集群的一部分 | YARN 的 NodeManager 的容器 | YARN 的资源管理器与 YARN 的 Application Master 一起为执行器分配 NodeManagers 上的容器 |
纱线(簇) | 与 YARN Application Master 一起运行 | 与 YARN 客户端模式相同 | 与 YARN 客户端模式相同 |
Kubernetes | 在 Kubernetes pod 中运行 | Spark底层执行原理详细解析(深度好文,建议收藏) |