大数据技术之DataX DataX之opentsdbwriter插件开发
Posted 脚丫先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据技术之DataX DataX之opentsdbwriter插件开发相关的知识,希望对你有一定的参考价值。
大家好,我是脚丫先生 (o^^o)
大数据项目之数据集成模块,按照项目需求需要集成时序数据库OpenTSDB。于是着手进行调研,https://github.com/alibaba/DataX
发现关于该时序数据库的插件只有单一的读插件,而阿里自研的TSDB读写插件都齐全。为了彻底的分离,同时为了完全适配OpenTSDB数据库,于是进行了OpenTSDB的写插件开发。
文章目录
一、OpenTSDB时序数据库
官方描述:OpenTSDB is a distributed, scalable Time Series Database (TSDB) written on top of HBase;翻译过来就是,基于Hbase的分布式的,可伸缩的时间序列数据库。
主要用途:常常用于集成到监控系统,譬如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储,查询。
详细: OpenTSDB数据库,数据都是以metric为单位的进行存储。可以这样对metric进行理解,metric就是一个需要监控的指标或者项,譬如服务器的话,会有CPU使用率、内存使用率,磁盘使用率等。OpenTSDB是基于HBase作为存储底层,因此对数据存储支持到秒级别,并且支持数据永久存储,不会主动删除。
OpenTSDB存储的一些核心概念: 我们以一个实际的场景进行理解:例如,我们采集1个服务器的CPU使用率,发现该服务器在21:00的时候,CPU使用率达到99%。
-
Metric:监控项。譬如上面的CPU使用率。
-
Tags:就是一些标签,在OpenTSDB里面,Tags由tagk和tagv组成,即tagk:takv。标签是用来描述Metric的。
-
Value:一个Value表示一个metric的实际数值,譬如上面的99%
-
Timestamp:即时间戳,用来描述Value是什么时候的;譬如上面的21:00
-
Data Point:即某个Metric在某个时间点的数值。
Data Point包括以下部分:Metric、Tags、Value、Timestamp 上面描述的服务器在21:00时候的cpu使用率,就是1个DataPoint
保存到OpenTSDB的数据,就是无数个DataPoint。
例如一个DataPoint
"metric":"temperature",
"timestamp":1567675709879,
"value":20.5,
"tags":
"host":"192.168.239.128"
指标名称:temperature
指标标签:host=192.168.239.128
指标值:20.5
时间戳:1567675709879
二、API接口
官方地址:https://www.w3cschool.cn/doc_opentsdb/opentsdb-api_http-put.html?lang=en
1、保存
2、查询
3、删除
三、opentsdbwriter初步开发
1、根据前文进行add插件模块
2、opentsdb数据库的java工具类
如今,卷又卷的时代,我们必须要站在前人的肩膀上探索,不需要重复的去造轮子。这里利用github上开源的opentsdb工具类进行插件的开发。地址如下所示:
https://github.com/fangpanpan/opentsdb-java-sdk
3、插件的开发
对于阿里datax的opentsdbwriter插件的开发,主要逻辑OpenTSDBWriter去实现。代码如下所示:
package com.alibaba.datax.plugin.writer.opentsdbwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONArray;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.nio.reactor.IOReactorException;
import org.opentsdb.client.OpenTSDBClient;
import org.opentsdb.client.OpenTSDBClientFactory;
import org.opentsdb.client.OpenTSDBConfig;
import org.opentsdb.client.bean.request.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* @date : 12:11 2022/1/15
*/
public class OpenTSDBWriter extends Writer
public static class Job extends Writer.Job
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public List<Configuration> split(int mandatoryNumber)
//按照reader 配置文件的格式 来 组织相同个数的writer配置文件
ArrayList<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++)
configurations.add(this.originalConfig.clone());
return configurations;
@Override
public void prepare()
@Override
public void init()
//获取配置文件信息parameter 里面的参数
this.originalConfig = super.getPluginJobConf();
String address = originalConfig.getString(Key.ENDPOINT);
if (StringUtils.isBlank(address))
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is not set.");
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
if (columns == null || columns.isEmpty())
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMN + "] is not set.");
List<String> columnTypes = originalConfig.getList(Key.COLUMNTYPE, String.class);
if (columnTypes == null || columnTypes.isEmpty())
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMNTYPE + "] is not set.");
String metric = originalConfig.getString(Key.METRIC);
if (StringUtils.isBlank(metric))
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.METRIC + "] is not set.");
@Override
public void destroy()
public static class Task extends Writer.Task
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private OpenTSDBConfig config;
private List<String> columnTypes;
private List<String> columns;
private String metric;
@Override
public void init()
Configuration writerSliceConfig = getPluginJobConf();
String address = writerSliceConfig.getString(Key.ENDPOINT);
this.columnTypes = writerSliceConfig.getList(Key.COLUMNTYPE, String.class);
this.columns = writerSliceConfig.getList(Key.COLUMN, String.class);
this.metric = writerSliceConfig.getString(Key.METRIC);
this.config = OpenTSDBConfig.address(address, 4242)
.config();
@Override
public void startWrite(RecordReceiver recordReceiver)
try
Record record;
List<Number> values = new ArrayList<>();
List<Map<String,String>> tags = new ArrayList<>();
while ((record = recordReceiver.getFromReader()) != null)
final int recordLength = record.getColumnNumber();
Map<String,String> tag = new HashMap<>();
for (int i = 0; i < recordLength; i++)
if (columnTypes.get(i).equals("tag"))
tag.put(columns.get(i),record.getColumn(i).asString());
if (columnTypes.get(i).equals("value"))
values.add(record.getColumn(i).asBigInteger());
tags.add(tag);
OpenTSDBClient client = OpenTSDBClientFactory.connect(config);
for (int i = 0;i < values.size();i++)
Point point = Point.metric(metric)
.tag(tags.get(i))
.value(System.currentTimeMillis(), values.get(i))
.build();
client.put(point);
client.gracefulClose();
catch (Exception e)
e.printStackTrace();
@Override
public void post()
@Override
public void destroy()
四、OpenTSDBWriter 插件的job
我这里想去实现一个场景:把mysql的数据通过datax导入到opentsdb数据库。
1、在mysql数据库新建源表
需要在mysql的数据库里,新建自己想同步数据的表。
CREATE TABLE `mysql_to_opentsdb` (
`id` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`sex` varchar(255) DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2、编写job
"job":
"content": [
"reader":
"parameter":
"password": "cetc@2021",
"connection": [
"querySql": [
" SELECT id,name,sex,price FROM mysql_to_opentsdb ; "
],
"jdbcUrl": [
"jdbc:mysql://192.168.239.128:3306/test"
]
],
"splitPk": "",
"username": "root"
,
"name": "mysqlreader"
,
"writer":
"parameter":
"columnType": [
"tag",
"tag",
"tag",
"value"
],
"endpoint": "http://172.10.10.51",
"metric": "dog",
"column": [
"id",
"name",
"sex",
"price"
]
,
"name": "opentsdbwriter"
],
"setting":
"errorLimit":
"record": 0,
"percentage": 0.02
,
"speed":
"channel": 3
关于datax的job需要自己去进行编写,因此我们需要根据项目业务进行设计。
endpoint: opentsdb的地址,4242端口已经固定。
metric:指标。
最后,进行maven命令行的打包操作,生成我们需要的插件,然后进行数据同步操作,
以上是关于大数据技术之DataX DataX之opentsdbwriter插件开发的主要内容,如果未能解决你的问题,请参考以下文章
数据同步工具DataX和DataWeb知识手册,DataX优化