6 - 教你如何使用Spark分布式执行Python脚本计算数据

Posted MLSQL之道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6 - 教你如何使用Spark分布式执行Python脚本计算数据相关的知识,希望对你有一定的参考价值。

Python拥有非常丰富的库,尤其是在科学计算领域,因此很多从事数据分析和科学计算的人偏爱Python。大数据有一个特点是存储在分布式系统,因此如何获取这些数据传给Python计算,并把计算结果存储到分布式系统,是一个不可避免的问题。

下面介绍一个类库pyjava(https://github.com/allwefantasy/pyjava),这个类库解决了Java/Scala与Python数据交换问题,采用Socket通信,数据交换采用Apache Arrow格式,避免了Java/Scala和Python数据交换的序列化和反序列化。

pyjava要求Python版本大于等于3.6,建议使用Conda的Python环境。

下面来创建conda的环境,pyjava依赖如下4个包:

conda.yamlname: respectdependencies: - python=3.6 - pip - pip: - numpy==1.14.3 - pandas==0.25.3 - pyjava==0.2.8.2 - pyarrow==0.14.1
创建conda的python环境,命名为respect:
conda env create -f conda.yaml
今天笔者用Mlsql Python的方式模拟Python Hadoop Streaming(笔者2014年用过,估计很多人都不知道)的计算,原理就是处理分组排序后的数据流,核心是分布式分组和组内排序。下面举个例子:
用户的访问日志,便于分析,只列出用户id和url,比如要看那个网页活跃,因此要计算url的pv(url访问次数)和uv(url访问人数):
uid url1 http://docs.mlsql.tech3 https://github.com/allwefantasy/pyjava2 https://github.com/allwefantasy/pyjava2 http://docs.mlsql.tech1 http://docs.mlsql.tech3 https://github.com/latincross/mlsqlwechat
根据url分组,然后组内根据uid排序,假设分为两组,每组如下:
   url uid组一:1. http://docs.mlsql.tech 12. http://docs.mlsql.tech 13. http://docs.mlsql.tech 24. https://github.com/allwefantasy/pyjava 25. https://github.com/allwefantasy/pyjava 3
组二:1. https://github.com/latincross/mlsqlwechat 3
先计算组一:
第1条处理后,http://docs.mlsql.tech,pv=1 uv=1第2条处理后,http://docs.mlsql.tech,url和第一条一样,pv=1+1=2 ,uid与第一条一样,uv=1第3条处理后,http://docs.mlsql.tech,url和第二条一样,pv=2+1=3 ,uid与第二条不一样,uv=1+1=2第4条处理后,https://github.com/allwefantasy/pyjava,url和第三条不一样,重置pv和uv,pv=1 uv=1第5条处理后,https://github.com/allwefantasy/pyjava,url和第四条一样,pv=1+1=2,uid与第三条不一样,uv=1+1=2
计算组二,只有一条数据:
https://github.com/latincross/mlsqlwechat,pv=1 uv=1
合并分组,最终的结果是:
http://docs.mlsql.tech pv=3 uv=2https://github.com/allwefantasy/pyjava pv=2 uv=2https://github.com/latincross/mlsqlwechat pv=1 uv=1
可以看出计算uv的时候没有用set,而是通过排序计算的,下面看看代码实现:
 val spark = SparkSession .builder() .master("local[*]") .getOrCreate() import spark.implicits._
val data = Seq(("http://docs.mlsql.tech" ,1), ("https://github.com/allwefantasy/pyjava" ,3), ("https://github.com/allwefantasy/pyjava" ,2), ("http://docs.mlsql.tech" ,2), ("http://docs.mlsql.tech" ,1), ("https://github.com/latincross/mlsqlwechat" ,3) ).toDF("url" ,"uid")
data.createOrReplaceTempView("test")
    //根据url分组,组内根据uid排序 val df = spark.sql("select * from test distribute by url sort by uid")
    //测试分组输出 df.rdd.foreachPartition(it => { it.foreach(r=>println("partition id: " + TaskContext.get.partitionId + " ,row: " + r.toString())) })
val struct = df.schema val timezoneid = spark.sessionState.conf.sessionLocalTimeZone
val rest = df.rdd.mapPartitions { iter => val enconder = RowEncoder.apply(struct).resolveAndBind() val envs = new util.HashMap[String, String]()      //设置conda python环境 envs.put(str(PythonConf.PYTHON_ENV) , "source activate respect")    //使用本地python环境    //envs.put(str(PythonConf.PYTHON_ENV), ":")
val batch = new ArrowPythonRunner( Seq(ChainedPythonFunctions(Seq(PythonFunction(          """ |#测试的时候可以打开,输出更多调试信息 |#import os |#os.environ["MLSQL_DEV"]="1" |def process(_data_manager): | pv = 0 | uv = 0 | lastUrl = None | lastUid = None | output = False | | for item in _data_manager.fetch_once_as_rows(): | urlCol = item["url"] | uidCol = item["uid"] | | if lastUrl is None: | pv = 1 | uv = 1 | lastUrl = urlCol | lastUid = uidCol | elif lastUrl == urlCol: | pv = pv + 1 | if lastUid != uidCol: | uv = uv + 1 | lastUid = uidCol | else: | #lastUrl变了,则输出 | output = True | yield {"url":lastUrl,"pv":pv,"uv":uv} | | #重置pv,uv,output | pv = 1 | uv = 1 | lastUrl = urlCol | lastUid = uidCol | output = False | #当只有一个url情况下,lastUrl为空表示_data_manager没有数据,不需要输出 | if not output and lastUrl is not None: | yield {"url":lastUrl,"pv":pv ,"uv":uv} |data_manager.log_client.log_to_driver("come from worker") |items=process(data_manager) |data_manager.build_result(items, 1024) """.stripMargin, envs, "python", "3.6")))), struct, timezoneid, Map() ) val newIter = iter.map { irow => enconder.toRow(irow) } val commonTaskContext = new SparkContextImp(TaskContext.get(), batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) columnarBatchIter.flatMap { batch => batch.rowIterator.asScala.map(_.copy()) }    }
    //指定输出的shema,转为spark df val wow = SparkUtils.internalCreateDataFrame(spark, rest, StructType(Seq(StructField("url", StringType),StructField("pv", LongType),StructField("uv",LongType))), false)
wow.collect().foreach(println(_))
可以看出,真实的分组是这样的:
partition id: 0 ,row: [https://github.com/latincross/mlsqlwechat,3]partition id: 31 ,row: [https://github.com/allwefantasy/pyjava,2]partition id: 31 ,row: [https://github.com/allwefantasy/pyjava,3]partition id: 108 ,row: [http://docs.mlsql.tech,1]20/05/14 16:54:23 INFO Executor: Finished task 31.0 in partition id: 108 ,row: [http://docs.mlsql.tech,1]partition id: 108 ,row: [http://docs.mlsql.tech,2]
虽然分组跟假设的不一样,但是不影响计算结果,跟预期一致:
[https://github.com/latincross/mlsqlwechat,1,1][https://github.com/allwefantasy/pyjava,2,2][http://docs.mlsql.tech,3,2]
为了更有利于调试,可以用pycharm进行开发:

6 - 教你如何使用Spark分布式执行Python脚本计算数据

跟开发普通python程序一样:

调试后,把上述代码粘到spark代码中,就可以了。是不是很方便本地调试啊?

通过分组排序,还可以做很多复杂的计算,比如根据访问日志切割Session,还比如计费系统,相同请求在一分钟之内重复访问不计费,计算有效的计费请求等等,当然也可通过Spark Api开发。在Python的世界中有很多的类库,通过上述方式都是可以使用的。上述例子只是一个简单的迭代的方式,如果partition数据不是很大的情况下,还可以把数据全部加载到Python内存,比如转成pandas的dataframe,然后就可以尽情发挥了。

Spark支持多计算引擎要解决的问题主要有(拿Python做示例):
  1. 控制Python程序的状态,比如启动、停止、中断等

  2. Spark与Python数据传输,比如通过File、Socket等

  3. 收集Python程序的日志,用户打印日志与系统打印日志

  4. 查看Python程序的执行进度


Pyjava目前实现了前三种,原理是在Spark Executor端启动Socket Server,然后启动Python Worker,并建立Sockect连接。Sockect中进行控制操作编码,数据编码采用Arrow格式,编码后进行传输,在解码端使用同样的规则解码,从而实现控制程序状态与数据的传输。日志也是采用Socket方式传到SparkDriver端。源码比较清晰,感兴趣可以自行翻阅。

注意:目前pyjava master分支有几bug需要修复
  1. 请参照:

    https://github.com/allwefantasy/pyjava/pull/2

    https://github.com/allwefantasy/pyjava/pull/3

    #当数据量小的情况下很容易出现空的partition,导致报错,需要修复serializers.py中的:#rb = pa.RecordBatch.from_arrays([[]], schema=pa.schema([('value', pa.string())]))rb = pa.RecordBatch.from_arrays([pa.array([""])], pa.schema([('value', pa.string())]))
    //scala代码也要做相应的修改,需要过滤掉空的partition:val newRest = rest.filter(_.toString != "[]")val wow = SparkUtils.internalCreateDataFrame(spark, newRest, StructType(Seq(StructField("url", StringType),StructField("pv", LongType),StructField("uv",LongType))), false)

        笔者认为这样修复是合理的,因为空的partition也是由计算得来,也是有意义的,在代码端自行过滤掉也是正常的操作。虽然对于一个工具来说,看着不那么优雅pyjava还支持Python项目模式,用于处理整个数据集,而不是分区方式,原理比较简单,感兴趣的读者请自行翻阅。






喜欢就点击最上方的[ MLSQL之道 ]关注下吧,后面精彩不断!

更多介绍请访问: http://docs.mlsql.tech/zh/


以上是关于6 - 教你如何使用Spark分布式执行Python脚本计算数据的主要内容,如果未能解决你的问题,请参考以下文章

手把手教你大数据离线综合实战 ETL+Hive+Mysql+Spark

教你如何成为Spark大数据高手?

如何在 Spark .NET 中执行分布式组合(N 选择 K)?

spark 能执行udf 不能执行udaf,啥原因

pytho 玩转Mysql

教你如何在Spark Scala/Java应用中调用Python脚本