Spark+TDengine 在中国电信电力测功系统监控平台上的应用实践

Posted TDengine

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark+TDengine 在中国电信电力测功系统监控平台上的应用实践相关的知识,希望对你有一定的参考价值。


小 T 导读:电力测功系统监控平台,是基于中国电信上海理想信息产业(集团)有限公司的,设备数据采集和设备银行V2.0应用进行设计和开发的。采集到的设备的实时数据,都是存储在TDengine中的。


应用背景

电力测功系统监控平台,是基于中国电信上海理想信息产业(集团)有限公司的,设备数据采集和设备银行V2.0应用,进行开发和设计的,软件是电力测功系统监控平台的核心,是实现数据汇聚、数据处理、可视化展示以及测功试验的重要部分,根据电力测功系统集控界面功能需求,可将整个电力测功系统监控平台架构分为四部分:设备层、网络层、平台层以及展示层。整体架构如下图:


实时数据是存储在TDengine数据库之中的,和设备银行之间进行通信,数据流程如下:

Spark+TDengine 在中国电信电力测功系统监控平台上的应用实践


设备银行主要负责模版、设备和报警信息的管理,其可视化界面如下:


Spark+TDengine使用过程

1. TDengine的安装  

请参考官方文档:

https://www.taosdata.com/cn/documentation/


2. 在TDengine中建立测试库和测试表

taos> create database test;taos>use test;#这里我们创建一个和tdengine自带库log中log表结构一致的表,后提直接从log.log读数据存储到test.log_cptaos> create table log_cp( -> ts TIMESTAMP, -> level TINYINT, -> content BINARY(80), -> ipaddr BINARY(15) -> )


3. Spark读取TDengine  

TDengine虽未提供Spark调用的DataSource,但TDengine本身也支持JDBC,因此,这里使用spark-jdbc来读取TDengine,最新版本可以到官网下载,我这里用的是如下版本: 

<dependency> <groupId>com.taosdata.jdbc</groupId> <artifactId>taos-jdbcdriver</artifactId> <version>1.0.3</version> </dependency>


关于JDBC的使用,官网有如下提示: 

由于TDengine是使用C语言开发的,使用taos-jdbcdriver驱动包时需要依赖系统对应的本地函数库。 

1. libtaos.so在Linux系统中成功安装TDengine后,依赖的本地函数库libtaos.so文件会被自动拷贝至/usr/lib/libtaos.so,该目录包含在Linux自动扫描路径上,无需单独指定。  

2.taos.dll在windows系统中安装完客户端之后,驱动包依赖的taos.dll文件会自动拷贝到系统默认搜索路径C:/Windows/System32下,同样无需要单独指定。 


第一次使用时,为了保证机器上有libtaos.so或taos.dll,需要在本地安装TDengine客户端(客户端请至TDengine官网下载) ,Spark的读取代码如下:

 val jdbccdf = spark .read .format("jdbc") .option("url", "jdbc:TAOS://192.168.1.151:6030/log") .option("driver", "com.taosdata.jdbc.TSDBDriver") .option("dbtable", "log") .option("user", "root") .option("password", "taosdata") .option("fetchsize", "1000") .load()

 

4. Spark存TDengine

因为在读TDengine的时候,第一个字段ts会被转换为decimal,但是存储时直接存decimal tdengine是不认的,所以需要将ts进行类型转换

jdbccdf.select(($"ts" / 1000000).cast(TimestampType).as("ts"), $"level", $"content", $"ipaddr") .write.format("jdbc") .option("url", "jdbc:TAOS://192.168.1.151:6030/test?charset=UTF-8&locale=en_US.UTF-8") .option("driver", "com.taosdata.jdbc.TSDBDriver") .option("dbtable", "log2") .option("user", "root") .option("password", "taosdata") .mode(SaveMode.Append) .save()


5. Spark yarn模式运行TDengine

上面的测试都是基于maser为local测试的,如果以yarn模式运行,则在每个节点上都安装TDengine客户端是不现实的,查看taos-jdbcdriver的代码,发现,driver会执行System.load("taos"),也就是说只要java.library.path中存在 libtaos.so,程序就可正常运行,不必安装TDengine的客户端,因为java.library.path是在jvm启动时就设置好的,要更改它的值,可以采用动态加载,采用如下方法解决了加载libtaos.so的问题:


(1) 将driver端libtaos.so发送到各个executor

spark.sparkContext.addFile("/path/to/libtaos.so")

(2) 重写Spark中JdbcUtils类中的createConnectionFactory方法,添加

loadLibrary(new File(SparkFiles.get("libtaos.so")).getParent)

进行java.library.path的动态加载

 def createConnectionFactory(options: JDBCOptions): () => Connection = { val driverClass: String = options.driverClass () => { loadLibrary(new File(SparkFiles.get("libtaos.so")).getParent) DriverRegistry.register(driverClass) val driver: Driver = DriverManager.getDrivers.asScala.collectFirst { case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == driverClass => d case d if d.getClass.getCanonicalName == driverClass => d }.getOrElse { throw new IllegalStateException( s"Did not find registered driver with class $driverClass") } driver.connect(options.url, options.asConnectionProperties) } }


(3) loadLibrary方法如下

 def loadLibrary(libPath: String): Unit = { var lib = System.getProperty("java.library.path") val dirs = lib.split(":") if (!dirs.contains(libPath)) { lib = lib + s":${libPath}" System.setProperty("java.library.path", lib) val fieldSysPath = classOf[ClassLoader].getDeclaredField("sys_paths") fieldSysPath.setAccessible(true) fieldSysPath.set(null, null) } }


在yarn模式下一定要给url设置charset和locale,如

charset=UTF-8&locale=en_US.UTF-8

否则container可能会异常退出。


6. libtaos.so其他加载方式  

本来还尝试了jna加载libtaos.so的方式,此方式只需将libtaos.so放入项目resources中,程序变回自动搜索到so文件,奈何不会改tdengine中c的代码。



作者介绍


董鸿飞,大数据开发工程师,2015年加入上海理想大数据实施部,工作至今。目前主要负责公司数据总线产品设计和开发。



公司介绍


中国电信上海理想信息产业(集团)有限公司,成立于1999年,注册资本7000万元,是上海市投资规模较大的信息技术企业之一。
通过整合公司内各事业部多年大型项目实施的整体实力,公司着力锻造大型信息化项目咨询规划和顶层设计能力,构建“智慧社区”、“智慧园区”及“智慧政务”、“智慧医疗”、“智慧物流”等各类智慧行业应用等整体解决方案,可提供IT外包服务和网络监控运维管理一站式安全解决方案,逐步形成“智慧城市”专业领域产品研发积累和项目交付与平台运营经验,锻造了整体科研队伍和项目实施团队的综合实力。



点击阅读原文,体验TDengine!

以上是关于Spark+TDengine 在中国电信电力测功系统监控平台上的应用实践的主要内容,如果未能解决你的问题,请参考以下文章

spark+tdengine的使用

时序数据库TDengine基本概念和建模思路

“天工云中控“助力传统工业转型,Tdengine帮了我们大忙

布局电力行业,百度智能云与中国电科院签署战略合作协议

打破中国企业类软件成长的怪圈,我找到了方法!

PL/SQL存储过程-中国电力负荷数据存储功能-仅供参考