Dask核心功能介绍及与Spark的比较

Posted 咬定青松

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dask核心功能介绍及与Spark的比较相关的知识,希望对你有一定的参考价值。

从谷爱凌身上,我们看到了支撑她走上神坛的4个因素:

优良的基因

衣食无忧的生活

兴趣自由

智力向导

其中基因是基础,也可以说是最重要的。不要小看基因的一点儿优化,哪怕一丁点的改良就可能超越当前地球在世人类,更别说石头缝中蹦出来的孙悟空几天就能学会翻筋斗,再看看比他多学了多少年的师兄的水平如何?再看看日本的羽生结弦,因为受制于身体极限,一直受困于4A,终究无法突破,也许他缺的就是那么一点儿基因改良。衣食无忧意味着财务自由,可以随心所欲选择自己感兴趣的事情,而无心理包袱,就像羽生结弦已经有了几次卫冕冠军的成就,再多一次,再说一次都无所谓,可以放纵去挑战4A。谷爱凌的生活环境加上精英父母及其生活圈子,相信她可以接触到全世界最优秀的教练、教育和培训,这可以让她避免很多弯路。

一、Dask组成部分

Dask是一款用Python开发的轻量级并行计算库,Dask由两部分组成:

  • 并行数据集合:比如array、dataframes和lists等。它们扩展了NumPy, Pandas 或者 Python iterators接口,能够支持超过内存的数据集处理。

  • 动态任务调度:同Airflow、Luigi和Celery类似,但是专门为计算优化过了,用于交互式计算场景。上述并行数据集合就运行在调度系统之上。

就Dask本身,并行数据集合和动态任务调度构成了其高层和低层视角。Dask 的高层数据集合可以看做 NumPy和Pandas在大数据集下的替代品。Dask低层调度系统提供基于Task Graph的并行动态任务调度,可以看做在复杂计算场景下Python线程或多进程库( threading or multiprocessing )的替代品。总体上,Dask具备以下优点:

  • 熟悉(Familiar):Dask高层提供了并行化版本的NumPy array 和 Pandas DataFrame,但使用方式跟NumPy和Pandas几乎保持不变,开发者使用起来更容易接受

  • 灵活(Flexible):Dask地层提供了任务调度系统接口,方便自定义类型的计算负载和跟其他项目集成

  • 原生(Native):分布式计算使用纯粹的Python实现,能够容易地访问Python数据对象

  • 快速(Fast):数值计算操作低开销、低延迟、低序列化

  • 伸缩(Scalable):支持小到单机单核运算模式,多到上千核的集群运算模式

  • 响应(Responsive):为交互式计算而设计,提供快速反馈和诊断

Task Graph可以看做是高层数据集合跟底层调度的衔接部分,高层数据集合对象 Array、DataFrame和Bag以及delay和future自动将大任务拆分成更小的子任务,并提交到调度系统去执行。不同于Array、DataFrame和Bag,利用delay和future能够实现更复杂的算法。

二、Dask 并行数据集合

1. Array  

Array是一个使用分块算法实现了NumPy ndarray的接口子集。

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))

2. Bag

Bag是允许重复的无序集合的数学名称,是multiset的友好代名词。bag或者multiset是set 集合概念的泛化,不像集合set,bag允许重复,作为对比,看几个例子:

  • list: ordered collection with repeats, [1, 2, 3, 2]

  • set: unordered collection without repeats, 1, 2, 3

  • bag: unordered collection with repeats, 1, 2, 2, 3

可见,bag跟list很像,但不保证顺序。举个创建bag的简单例子:

import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
>>> b
dask.bag<from_sequence, npartitions=2>

Dask Bag在普通Python 对象集合上实现类似map, filter, fold 和groupby这样的操作,就像PySpark RDD那样,使用Bag能够在少量内存上实现并行操作。Dask Bags 常被用于日志文件、JSON格式的记录或其他用户定义的Python对象上的简单预处理,比如下例中处理JSON文件:

import dask.bag as db
import json


b0 = db.read_text('data/*.json').map(json.loads)
b0.count().compute()  # Count total number of records


b=b0.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30
b=b.map(json.dumps)  # Convert Python objects to text
b.to_textfiles('data/processed.*.json')) # Write to local disk

3. DataFrame

Dask DataFrame是Pandas DataFrame的分布式版本,它将沿着index索引分区的多个Pandas DataFrame整合起来,如下图示例所示:

import dask
import dask.dataframe as dd
df = dask.datasets.timeseries()

timeseries创建了一个2000年1月份、每秒1条的时序数据,每条记录4个字段(id,name,x,y):

f = df[df.y > 0].groupby('name').x.std().compute()

Array、DataFrame和Bag都能够支持在超过内存的数据集上并行执行,为此,它们被实现为延后执行模式,像上面示例中那样,只有当明确调用compute方法才真正触发计算。

三、任务依赖图

通常人类写完程序,然后再用编译器或解释器(如javac、python等)编译或解释执行,但有时人类不希望按照机器自动编排的执行方式,而希望按照自己的方式来分析、优化和执行程序,这样,原来属于编译器或解释器的职责就转移到人类开发者身上。进而,程序的结构也被明确地表示为程序本身的一部分。在用户空间进行并行化的一种常见方式就是借助任务调度(task scheduling)。在任务调度中,我们把计算任务分解为大小适中的子任务或计算单元(一般是函数调用),这些子任务往往存在一种依赖关系,表现为有向无环图(DAG),Dask在内部通过一种编码规范将这个图用普通的字典(dict)、元组(tuple)和函数(function)来表示。图中的每个节点就是子任务或计算单元,节点的边表示节点的数据产出和消费关系。然后通过任务调度在满足节点依赖关系的前提下,尽可能提升独立节点的并发性来最有效率地执行整个DAG。举个例子:

def inc(i):
    return i + 1


def add(a, b):
    return a + b


x = 1
y = inc(x)
z = add(y, 10)

上述计算逻辑的任务依赖图在 Dask 内部表示为这样一个字典结构:

d = 'x': 1,
     'y': (inc, 'x'),
     'z': (add, 'y', 10)

其中key为任意可哈希的非Task类型的标识值,value为Task类型的元组,如:

(add, 'x', 'y')

其中元组第一个元素是可调用的函数,后续元素是函数参数,参数既可以是key、常量值、Task类型的实例或以上的参数列表。

用可视化图表示为:

Dask的所有调度的入口函数是get,通过get函数能够直接跟调度器交互,获取任意节点的计算结果值,比如通过依赖图的key,调用get方法可以触发结果计算:

虽然可以将任务依赖图格式编码为普通的Python dict,tuple,function基本对象,能够将其跟其他集合隔离开来,但是在开发值很少直接操作依赖图,除非想实现自定义Module。实际上,使用dask.delay是更好的选择:

import dask


@dask.delayed
def inc(x):
    return x + 1


@dask.delayed
def add(x, y):
    return x + y


x=1
y = inc(x)
z=add(y,10)
z.visualize()

通过可视化可以清楚地看到跟上面的依赖图类似,不同于上例中同步调用,这里使用了delay方法注解,将任务执行变为延后执行:当明确调用z.compute()才会触发DAG调度,这带来的好处是并行化。跟delay相对应的是future,它扩展了 Python concurrent.futures 接口,立即返回一个指向远程结果的引用,但是当请求该结果时,远程结果可能还没有完成,但最终任务会结束,执行结果存放在远程的thread/process/worker 之中,可以通过result方法获取:

from dask.distributed import Client


client = Client()  # start local workers as processes
# or
client = Client(processes=False)  # start local workers as threads


def inc(x):
    return x + 1


def add(x, y):
    return x + y


a = client.submit(inc, 10)  # calls inc(10) in background thread or process
a.result()  # blocks until task completes and data arrives

四、 动态任务调度

上面介绍的高层数据集合:Dask Array、Dask DataFrame、Dask Bag以及delay和future接口都会自动创建任务依赖图,任务依赖图最终被提交到任务调度执行。

有很多种任务调度方法,比如embarrassingly parallel、MapReduce和full task scheduling。full task scheduling就完全依赖任务调度了,虽然有很多方式来实现调度,比如Spark、Airflow、Storm等,但是这些调度逻辑内嵌到大的计算框架内部,因此,如果单独去用这些调度功能,还是需要重新开发。Dask提供了多种调度器可供选择,差别是性能不一样。Dask 有两个任务调度家族:单机调度和分布式调度,它们都可以通过全局配置。

  • 单机调度:这类调度器在本机上利用单进程或者线程池,提供基本调度功能。作为调度器的默认选项,简单易用,不支持扩展。

#Local Threads(using ThreadPoolExecutor)
import dask
dask.config.set(scheduler='threads')  # overwrite default with threaded scheduler


#Local Processes( using ProcessPoolExecutor)
import dask
dask.config.set(scheduler='processes')  # overwrite default with multiprocessing scheduler


#Single Thread
import dask
dask.config.set(scheduler='synchronous')
  • 分布式调度:分布式利用多节点,实现更复杂的功能,支持扩展,但分布式模式也可以基于单机实现。分布式集群有很多种,比如:

#Dask Distributed (local)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()  # Launches a scheduler and workers locally
client = Client(cluster)  # Connect to distributed cluster and override default


#Dask Distributed (Cluster)
from dask.distributed import Client
from dask_yarn import YarnCluster


cluster = YarnCluster(**cluster_specific_kwargs)
client = Client(cluster)


from dask.distributed import SSHCluster


cluster = SSHCluster(["MachineA", "MachineB", "MachineC"])
client = Client(cluster)


from dask_kubernetes import KubeCluster


cluster = KubeCluster(**cluster_specific_kwargs)
client = Client(cluster)

分布式集群除了上述类型外,还有其他很多,比如可以基于科学计算或者工业研究使用的高性能超算,使用Job调度系统如SLURM, SGE, TORQUE, LSF, DRMAA, PBS等来部署集群:

from dask_jobqueue import PBSCluster


cluster = PBSCluster(cores=36,
                     memory="100GB",
                     project='P48500028',
                     queue='premium',
                     interface='ib0',
                     walltime='02:00:00')


cluster.scale(100)  # Start 100 workers in 100 jobs that match the description above


from dask.distributed import Client
client = Client(cluster)    # Connect to that cluster
还可以通过MPI来部署:
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json


from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')

还可以部署到其他公有云AWS、Azure和 GCP。当然,也可以自己独立部署:

dask-scheduler
dask-worker tcp://host1:8786
dask-worker tcp://host2:8786
。。。

所有部署方式都是基于几条简单的命令就可以完成,相当简单,这种方式也提供了一令人激动的集群运行模式:临时集群-一旦计算完成,集群自动停止。

分布式调度采用集中式架构:dask-scheduler作为核心调度器协调分布在多台机器上的多个dask-worker,同时处理来自多个client的并发请求。scheduler基于异步事件机制实现,这意味着能够并发接受客户端请求,多个worker之间通过TCP传输数据,scheduler内部通过依赖图来跟踪任务,每个任务就是一个Python函数,一个任务可能是另一个任务的输出结果,任务依赖图随着用户交互动态变更。从前文的介绍可知,任务提交调度主要有两种途径:调用数据集合的compute方法和client.submit方法。下面通过描述一个Task从被提交到调度到执行结果返回的完整路径,来演示说明调度工作逻辑。示例代码如下:

client = Client('host:port')
x = client.submit(...)
y = client.submit(...)


z = client.submit(add, x, y)  # we follow z


print(z.result())

假设client已经提交了两个计算任务,分别用变量x和y引用标识。现在client又提交了一个计算x和y的add任务,最后再获取计算结果,用变量z来引用标识。由计算逻辑可知,z依赖x和y。下面分步骤来说明客户端与调度系统的交互过程:

Step 1. 客户端调用submit提交add计算任务,该函数将向scheduler发送如下消息:

'op': 'update-graph',
 'tasks': 'z': (add, x, y),
 'keys': ['z']

该消息是个dict结构,op字段标识操作类型,tasks字段标识任务,keys标识任务的key。submit立即返回(可能在scheduler接收到消息之前)一个future对象,记录当前任务的状态为pending。

Step 2. scheduler接收到消息,并且更新相应状态:

scheduler.update_graph(tasks=msg['tasks'], keys=msg['keys'])

此时scheduler注意到x和y本身也是变量,就会去查找x和y,然后更新它们的状态。

Step 3. 选择worker

一旦依赖x和y都已经执行完成,x和y的相应信息就被保留到worker的本地内存中,此时需要选择合适的worker来执行z。如何选择worker,Dask大致按照如下几个标准:

1. 首先,快速选择那些内存中有x或者y的worker;

2. 然后,选择为计算z而传输x和y,消耗更少传输数据的worker。比如,如果有两个不同的worker持有x和y,但是y本身比x占用更多的数据,就选择持有y的的机器,来减少网络传输数据量。

3. 如果有多个worker的数据传输量都比较少,那就选择负载最小的机器。

Step 4.  scheder向woker发送调度信息

当被选择的worker因为有计算任务退出,有了空余的计算资源,而且计算任务z也满足了调度条件,调度器就会将其从相应的任务队列中取出,将执行的函数信息(包括函数名称、key和参数)、worker位置打包成消息,通过TCP发送给worker。

'op': 'compute',
 'function': execute_task,
 'args': ((add, 'x', 'y'),),
 'who_has': 'x': (worker_host, port),
             'y': (worker_host, port), (worker_host, port),
 'key': 'z'

Step 5. worker执行任务

worker解压收到的消息,并且注意到计算z依赖x和y,如果本机没有x和y的数据,然后就从who_has指定的机器列表中随机选择一个,当获取到依赖的数据,就通过ThreadPoolExecutor 启动计算(add, 'x', 'y')。同时worker仍然可以计算其他任务,并不阻塞在当前计算任务上。最终,任务计算完成,worker将计算结果存在本地内存:

data['z'] = ...

然后向scheduler传回计算成功和结果数据大小的消息,形如下面的消息:

Worker: Hey Scheduler, 'z' worked great.
        I'm holding onto it.
        It takes up 64 bytes.

Step 6. 响应客户端

scheduler接收到worker关于z就绪的消息后,再向所有持有z的future对象的客户端发消息,唤醒阻塞在z.result上的方法调用。同时检查哪些worker持有z的结果,并将其结果转发给client。

Step 7. 垃圾回收

当x和y不再被使用的时候,scheduler向相关worker发消息,通知x和y可以从其本地内存中删除了。同样,假如局部变量z失去作用范围,并且引用计数为0,也将被Python垃圾回收器清理掉,此时scheduler也将收到释放z的消息,在scheduler将z从本地调度状态中删除之后,也周期性地通知相关worker释放相应的key。

上述步骤走完,通常经历几个毫秒的时间。通过上述任务提交、执行和获得响应的过程,我们大致了解了Dask 调度的工作机制,但是里面比较复杂的地方大概在于worker的选取和task的选取,这就是任务的调度策略问题。示例中,介绍了几个选取worker的标准,没有过多介绍task的选取,实际使用中,大量符合调度条件的的并发任务还需要遵从一系列启发式规则,比如先到先服务、后到先服务、处于关键路径且被依赖的任务数量越多的先执行、优先级高的优先等。

五、Dask与Spark的比较

Apache Spark 是一个流行的,用于表格式数据集(tabular datasets)上的分布式计算工具,在今天的大数据分析时代,Spark正成为该领域一个响亮的名字,而Dask作为扩展了现存的Python生态的并行计算库,同样能够在超过内存大小的数据集上提供并行分布式计算能力。那如何比较Spark和Dask,以及如何进行技术选型呢?以一种客观、公正的角度来回答这个问题确实困难,尤其当两者的差异表现在技术实现上。尽管如此,本文仍尝试来这样做,但也欢迎有效的批评指正。

总体来讲,相对Spark,Dask 小巧,轻量。这意味着,Dask有较少的特性,如果提供高级功能,就需要跟其他库协作,尤其涉及到Python生态中的数值计算,比如Pandas、Scikit-Learn等。下文从以下几个方面来详细说明。

1. 语言

  • Spark用Scala语言编写,同时也支持Python、R、Java。它跟其他JVM代码互操作较好。

  • Dask用Python语言编写,不支持其他语言。它跟C/C++/Fortran/LLVM或其他通过Python链接的本地编译代码的互操作性结合较好。

2. 生态

  • Spark是一个all-in-one项目,整合了SQL分析、流式处理、机器学习和图处理等,并且跟其他很多Apache项目的集成度较好。

  • Dask 是Python生态的一个组成部分,它耦合,同时也加强了其他库的能力,比如NumPy, Pandas, and Scikit-Learn等。

3. 年龄和信誉度

  • Spark于2010年问世,存在时间较长,在企业大数据领域信誉度较高。

  • Dask于2014年问世,稍显年轻,它是广受信任的NumPy/Pandas/Scikit-learn/Jupyter等技术栈的扩展。

4. 应用领域

  • Spark主要集中在像SQL处理的商业智能和轻量的机器学习方面。

  • Dask主要应用在商业智能和一些科学计算以及自定义应用场景。

5. 内部设计

  • Spark的内部模型是高层的( higher level),在计算上,从高层提供一致的优化,但是在复杂算法和临时应用系统上缺乏灵活性,从根本上说,Spark是Map-Shuffle-Reduce 范式的扩展。

  • Dask的内部模型是低层的(lower level),缺乏从高层的优化,但是能够实现复杂算法和定制应用系统,从根本上说,Dask是建立在通用任务调度上的。

总之,如果用户习惯Scala或者SQL 开发、经常进行ETL和商业分析应用、以及倾向于用更可靠、单一技术栈的计算,那Spark是优先选择;如果用户习惯Python开发、在ETL+SQL能力之外,还希望添加额外的并发控制、经常进行的复杂计算明显不适合Spark的内部模型、希望轻量地从本地开发模式转移到集群开发模式、愿意集成三方技术且不介意安装工具包,那Dask是其优先选择。当然两者也可以同时使用,特别是Dask支持的多种集群部署方式,支持用户在现有基于YARN、Kuberbetes的资源管理框架下使用Dask,而且二者都支持 CSV, JSON, ORC和 Parque文件格式的读写。但是如果只是处理在T级别大小的CSV或JSON格式的数据,那Postgres或者MongoDB是其首选。

参考链接

https://docs.dask.org/en/stable/

https://docs.dask.org/en/stable/graphs.html

https://docs.dask.org/en/stable/custom-graphs.html

https://blog.dask.org/2020/07/30/beginners-config

https://blog.dask.org/2020/07/23/current-state-of-distributed-dask-clusters

https://docs.dask.org/en/stable/deploying.html

http://distributed.dask.org/en/stable/journey.html

https://docs.dask.org/en/latest/spark.html?comparison-to-spark

以上是关于Dask核心功能介绍及与Spark的比较的主要内容,如果未能解决你的问题,请参考以下文章

Storm介绍及与Spark Streaming对比

Flink与Spark Streaming在与kafka结合的区别!

每日一题介绍Spark核心组件及功能?

MQTT介绍及与其他协议的比较

MQTT介绍及与其他协议的比较

我将如何在 Dask 中进行 Spark 爆炸?