YMatrix + PLPython替代Spark实现车联网算法

Posted db_shidb

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了YMatrix + PLPython替代Spark实现车联网算法相关的知识,希望对你有一定的参考价值。

PySpark算法开发实战

一、PySpark介绍

Spark是一种快速、通用、可扩展的大数据分析引擎,PySpark是Spark为Python开发者提供的API。在有非常多可视化和机器学习算法需求的应用场景,使用PySpark比Spark-Scala可以更好地和python中丰富的库配合使用。

使用Python开发Spark需要使用到pyspark,pyspark是Spark为Python开发者提供的API。pyspark使用Py4J库,使得Python可以使用JVM对象。

二、运行环境搭建

操作系统 CentOS Linux release 7.8.2003 (Core)
Java 1.8.0_151
Python 3.6.13
Spark 2.4.0
Miniconda 4.5.4
pyspark 3.2.1   
pyarrow 6.0.1

Miniconda

  1. 安装Miniconda
    conda和virtualenv是Python的包管理与环境管理工具。conda的安装程序中包含conda软件包管理器和Python,不需要再单独安装Python,使用起来较为方便。Miniconda为conda精简版,大小约为50M。
    由于我们使用的Spark版本与Python版本为历史版本,需要用4.5.4版本的Miniconda(对应Python 3.6)进行安装。当前官网下载页的miniconda支持到最低3.7版本Python,需要在https://repo.anaconda.com/miniconda/上下载。根据机型选择Miniconda2-4.5.4-Linux-x86_64.sh下载。
    下载完成之后运行脚本Miniconda2-4.5.4-Linux-x86_64.sh进行安装。完成之后可以使用conda -V检查安装结果。
    conda -V conda 4.5.4
  2. 设置用于Spark的虚拟环境
    首先建立一个pyspark_env的环境
    conda create --name pyspark_env python=3.6
    新建完成之后可以从过conda activate进入虚拟环境
    conda activate pyspark_env
    进入环境之后命令行会有环境名的标识用于区分

创建好并进入pyspark_env的虚拟环境之后,我们需要安装两个Spark相关的库,pyspark和pyarrow。可以使用conda install安装或者也可使用pip,这里以使用pip安装为例:
pip3 install pyarrow pyspark
安装完毕之后可以使用conda list查看安装好的库

此,环境搭建中的conda部分已经完成。详细的操作可以参考Spark的最新文档,pyspark conda部署的部分是多版本通用的:Installation - PySpark 3.2.1 documentation

Spark

  1. 下载Spark
    我们下载Spark已经编译好的压缩包,所有的历史版本可以在这个链接中找到:https://archive.apache.org/dist/spark/,
    本文下载spark-2.4.0-bin-hadoop2.7.tgz
    下载完成之后解压文件。完成之后可以进入目录运行bin/spark-shell进行测试

  2. Standalone模式启动集群
    Spark的集群模式总共分为四种

  • Standalone
  • Apache Mesos
  • Hadoop YARN
  • Kubernetes
    2、3、4都比较好理解,Standalone模式是Spark自身实现的资源调度框架。
    复制spark根目录下的conf/spark-env.sh.template -> conf/spark-env.sh
    在其中添加
    SPARK_MASTER_HOST = [hostname] # master的主机名
    SPARK_MASTER_PORT= 7077
    在master节点上运行
    ./sbin/start-master.sh
    启动之后可以登录webui查看,地址为IP:8080

同样,在slave节点设置好环境变量之后运行
./sbin/start-slave.sh

三、Spark分布式运行算法

下面的代码是Spark 运行Pandas UDF的例子。

def scalar_pandas_udf_example(spark):
    import pandas as pd
    from pyspark.sql.functions import col, pandas_udf
    from pyspark.sql.types import LongType
    def multiply_func(a, b):
        return a * b
    multiply = pandas_udf(multiply_func, returnType=LongType())
    x = pd.Series([1, 2, 3])
    print(multiply_func(x, x))
    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    df.select(multiply(col("x"), col("x"))).show()
if __name__ == "__main__":
    spark = SparkSession \\
        .builder \\
        .master('local')\\
        .appName("UDFTest") \\
        .getOrCreate()
    print("Running pandas_udf scalar example")
    scalar_pandas_udf_example(spark)
    spark.stop()

首先生成一个SparkSession对象,参数master->'local’指的是local模式运行,如果是集群的话这里local换成spark:\\masterip:7077,appName->'UDFTest’定义了任务名称

spark = SparkSession \\
        .builder \\
        .master('local')\\
        .appName("UDFTest") \\
        .getOrCreate()

定义一个简单的函数

def multiply_func(a, b):
        return a * b

生成UDF对象

multiply = pandas_udf(multiply_func, returnType=LongType())

生成一个pandas的数据

x = pd.Series([1, 2, 3])

创建一个Spark dataframe对象并让spark执行UDF

df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
df.select(multiply(col("x"), col("x"))).show()

该代码运行的结果为:

四、Spark数据处理的缺点

  1. 一般生产环境下的数据想要使用Spark做计算,数据需要从存储的数据库->落盘文件/消息队列->Spark集群数据链路需要建设和维护
  2. PySpark + Pandas UDF处理数据,尽管利用了Apache Arrow,核心还是需要JVM与Python之间数据传输,开销大,不适用于性能敏感的场景。
    YMatrix+ PLPython处理方式
    上文描述了Spark在车联网信号分析的实际使用案例。Spark的优点很明显,作为分布式的内存计算引擎,社区活跃、支持多语言开发、易于融合其他如Hadoop等框架。但使用上的缺点在上文中也有描述:需要搭建并维护一整条新的数据链路;并且除去Scala,使用Python等其他语言研发不适合在性能要求高的场景下使用。
    那么回到我们实现车联网信号分析这个案例上,实际上最终的需求是从一部分数据中取出想要的数据、并且经过一定的要求与计算,筛选出最终的数据结果——实际上就是简单的一条SQL+代码实现算法处理数据。上述例子中,“一条SQL”变成了整条数据链路,从数据库取出数据处理完丢进消息队列,“代码实现算法”的代码加载到了Spark分布式的消费数据和运行。如果能够把“代码实现算法”这部分与下沉到数据库这层,那么我们不仅减少了维护一整套数据链路的开销,还能利用数据库的算力提供性能,加速数据流转。想要利用数据库帮我们进行数据分析与计算,我们需要MPP架构的数据库。

一、YMatrix与PLPython介绍

YMatrix
YMatrix是超融合数据库,将交易型数据库(OLTP)、分析型数据库(OLAP)和时序数据库能力融为一体的超融合型分布式数据库产品,具备严格分布式事务一致性、水平在线扩容、安全可靠、成熟稳定、兼容PostgreSQL/Greenplum协议和生态等重要特性。为万物互联的智能时代提供坚实、简洁的智能数据核心基础设施,为物联网应用、工业互联网、智能运维、智慧城市、实时数仓、智能家居、车联网等场景提供一站式高效解决方案,YMatrix为公司自主研发的国产数据库,公司拥有该产品全部知识产权。产品的架构如下。

YMatrix不但对经典的Greenplum数据仓库场景进行了大幅增强,而且可以极佳的支持大规模时序数据处理、支持时空数据、结构化数据和半结构化数据,一套数据库解决各种数据类型,避免为了处理不同类型数据引入不同类型的产品。实现提高开发运维效率、提升系统性能、降低整体成本的目标。

PLPython
PL/Python过程语言允许用Python编写 PostgreSQL函数。Python有非常多成熟的库能够提供给我们做数据分析,如numpy、pandas等。
使用PLPython方便数据分析的算法实现,可以充分利用YMatrix分布式储存和算力。

二、PLPython调用外部Python代码

上文中描述了如何使用Python开发Spark应用,让我们的算法能够使用Spark的分布式计算能力,整体的数据流程是从csv的数据文件中读取数据->Spark Arrow内存的数据类型中->Spark分布式计算输出结果。
相同的算法也可以通过PLPython,将数据转存到YMatrix查询计算实现
将数据导入YMatrix
使用Mxgate将csv中的数据导入数据库,在此之前需要新建表

        vin text,
        daq_time DATE,
        status INT,
        c_stat INT,
        mode INT,
        speed INT,
        mileage INT,
        t_volt INT,
        t_current INT,
        soc INT,
        dcdc_stat INT,
        isulate_r INT,
        lng BIGINT,
        lat BIGINT,
        max_volt_bat_id INT,
        max_volt_cell_id INT,
        max_cell_volt INT,
        min_volt_bat_id INT,
        min_cell_volt_id INT,
        min_cell_volt INT,
        max_temp_sys_id INT,
        max_temp_probe_id INT,
        max_temp INT,
        min_temp_sys_id INT,
        min_temp_probe_id INT,
        min_temp INT,
        max_alarm_lvl INT,
        genral_alarm INT,
        cell_volt_list text,
        cell_temp_list TEXT,
        pdate date) 
distributed BY (vin)

表格建好了之后导入数据

tail -n +1 data.csv | mxgate --source stdin --db-database test --db-master-host localhost.localdomain --db-master-port 5432 --db-user mxadmin --time-format raw --target suanfa_data  --delimiter ','

编写PLPython函数调用算法
首先我们需要把算法代码上传到服务器上,在本例中路径为/home/mxadmin/plpython/
我们需要查询suanfa_data表中的所有数据,并将结果转化成pandas的Dataframe格式,传递给算法函数去做处理

sql = "SELECT * FROM suanfa_data;"
df = psql.frame_query(sql, cnxn)


create function suanfa_detector() returns void as $$

import sys
sys.path.append('/home/mxadmin/plpython/')
from src.analyzer import analyzer

import pyodbc
import pandas.io.sql as psql
sql_result = plpy.execute("SELECT * FROM suanfa_data;")
df = pd.DataFrame.from_records(sql_result)

result = analyzer(df)
plpy.notice(result)

$$ language plpython3u;

下面是算法函数,输入是我们suanfa_detector()中sql的查询结果转换成的dataframe对象,经过数据处理,最后输出结果的dataframe

Def  analyzer(data: pandas.Dataframe)-> pandas.Dataframe:
    data_wash(data)
    sign_data_veh_state(data)
    detect_two_alarm_tuples = detect_two_analyze(data)
    return detect_two_alarm_tuples

以上是关于YMatrix + PLPython替代Spark实现车联网算法的主要内容,如果未能解决你的问题,请参考以下文章

在plpython函数greenplum数据库中插入语句

如何在 Postgres 中使用 Plpython3 返回结果集

在 plpython 中处理反斜杠

plpython 程序中的 Greenplum pandas 数据框集成(来自数据库内部)

PostgreSQL,plpython3u函数。 os.getenv找不到环境变量

Spark 广播替代方案