如何利用开源插件?又快又好地搞好数据接口开发,连通不同应用系统
Posted java李杨勇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何利用开源插件?又快又好地搞好数据接口开发,连通不同应用系统相关的知识,希望对你有一定的参考价值。
目录
前言介绍:
毫不夸张地说,没有开发者还没踢过“应用数据不互通”这块铁板——平台不同、技术不同、存储和部署方式不同的情况下,又缺少必要的接口,应用系统之间难以互通。而随着业务需求的不断扩展,应用也在不断向多元化、个性化发展,未来业务与陈旧技术栈间矛盾也日益凸显,需要的接口数量也越来越多。
如何简单快速地搞定接口开发,也就成了一个需要我们考虑的问题。最近,我挖到了一个很香的开源插件,仔细研究了技术文档之后,决定安利给大家:
开源插件 Tapdata PDK
GitHub 链接:https://github.com/tapdata/idaas-pdk
这个项目的发布者是国内一个专攻实时数据服务平台的创业团队 Tapdata,据官方透露,这次开源的这个小组件,也是其核心产品开源的投路石,背靠的是这个团队在数据实时同步方面相当成熟的实力。
PDK 是其数据接口技术抽象化而来的一个开源插件开发框架,通过 Source Plugin 接口或者 Target Plugin 接口,可以快速实现新数据库作为 Tapdata 的源或目标的适配兼容,从而通过 Tapdata Cloud产品和即将开源的 Tapdata,免费获得各种异构数据源到目标数据库或平台的实时数据对接能力。
按照 PDK 连接器的开发规范进行数据源和目标端的开发,可以简化数据链路的开发流程,通过详细的开发规划和内置的 TDD 测试,可简单、快速地完成新数据源和目标端的开发工作。
支持类型包括:
- 接入数据库: mysql、Oracle、PostgreSQL 等
- 接入 SaaS 产品: Salesforce、vika 维格表、金数据表单、Zoho CRM 等
- 接入自定义数据源: 可对接私有协议数据源
快速开始目标数据库接入
目前,PDK 团队已将技术文档公开,大家可以前往 GitHub(https://github.com/tapdata/idaas-pdk) 具体了解。
准备环境
- Java 8
- Maven
- Git
- IntelliJ IDEA
下载源码并编译
git clone https://github.com/tapdata/idaas-pdk.git
cd idaas-pdk
mvn clean install
创建目标数据库的Connector工程
例如 group 为 io.tapdata, 数据库 name 为 XDB, 版本 version 为 0.0.1, 通过以下命令创建 Connector 工程
-
目标数据库无需建表时
./bin/tap template --type target --group io.tapdata --name XDB --version 0.0.1 --output ./connectors
用 ItelliJ IDEA 打开 idaas-pdk, 在 idaas-pdk/connectors 下就能看见 xdb-connector 工程。
- 在 spec.json 里填写 configOptions
configOptions 集成到 Tapdata 站点之后, 配置给用户在使用该Connector的时候的输入项, 例如数据库的连接地址, 用户名, 密码等等
...
"configOptions":
"connection":
"type":"object",
"properties":
"host":
"type": "string",
"title": "Host",
"x-decorator": "FormItem",
"x-component": "Input"
,
"port":
"type": "number",
"title": "Port",
"x-decorator": "FormItem",
"x-component": "Input"
- 编写接入目标数据库的代码
@TapConnectorClass("spec.json")
public class XDBConnector extends ConnectorBase implements TapConnector
@Override
public void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer)
//TODO Load tables from database, connection information in connectionContext#getConnectionConfig
//Sample code shows how to define tables.
consumer.accept(list(
//Define first table
table("empty-table1"),
//Define second table
table("empty-table2"))
));
@Override
public void connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer)
//Assume below tests are successfully, below tests are recommended, but not required.
//Connection test
//TODO execute connection test here
consumer.accept(testItem(TestItem.ITEM_CONNECTION, TestItem.RESULT_SUCCESSFULLY));
//Login test
//TODO execute login test here
consumer.accept(testItem(TestItem.ITEM_LOGIN, TestItem.RESULT_SUCCESSFULLY));
//Read test
//TODO execute read test by checking role permission
consumer.accept(testItem(TestItem.ITEM_READ, TestItem.RESULT_SUCCESSFULLY));
//Write test
//TODO execute write test by checking role permission
consumer.accept(testItem(TestItem.ITEM_WRITE, TestItem.RESULT_SUCCESSFULLY));
private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer)
//TODO write records into database
//Below is sample code to print received events which suppose to write to database.
AtomicLong inserted = new AtomicLong(0); //insert count
AtomicLong updated = new AtomicLong(0); //update count
AtomicLong deleted = new AtomicLong(0); //delete count
for(TapRecordEvent recordEvent : tapRecordEvents)
if(recordEvent instanceof TapInsertRecordEvent)
//TODO insert record
inserted.incrementAndGet();
else if(recordEvent instanceof TapUpdateRecordEvent)
//TODO update record
updated.incrementAndGet();
else if(recordEvent instanceof TapDeleteRecordEvent)
//TODO delete record
deleted.incrementAndGet();
//Need to tell incremental engine the write result
writeListResultConsumer.accept(writeListResult()
.insertedCount(inserted.get())
.modifiedCount(updated.get())
.removedCount(deleted.get()));
private void queryByFilter(TapConnectorContext connectorContext, List<TapFilter> filters, Consumer<List<FilterResult>> listConsumer)
//Filter is exactly match.
//If query by the filter, no value is in database, please still create a FitlerResult with null value in it. So that flow engine can understand the filter has no value.
-
目标数据库须建表时
./bin/tap template --type targetNeedTable --group io.tapdata --name XDB --version 0.0.1 --output ./connectors
用 ItelliJ IDEA 打开 idaas-pdk, 在 idaas-pdk/connectors 下就能看见 xdb-connector 工程。
- 在 spec.json 里填写 configOptions
configOptions 集成到 Tapdata 站点之后, 配置给用户在使用该 Connector 的时候的输入项, 例如数据库的连接地址、用户名、密码等
...
"configOptions":
"connection":
"type":"object",
"properties":
"host":
"type": "string",
"title": "Host",
"x-decorator": "FormItem",
"x-component": "Input"
,
"port":
"type": "number",
"title": "Port",
"x-decorator": "FormItem",
"x-component": "Input"
- 在 spec.json 里填写 dataTypes(类型表达式)
dataTypes 用于描述该 Connector 接入数据库的所有字段的范围,以及转换到对应的 TapType。 源端数据库也会提供相同的 dataTypes 描述, 这样当源端数据流入到 Tapdata 里时, 会结合源端 dataTypes 的字段描述信息结合源端库表的字段信息, 通过 Tapdata 的中立数据结构进入到 Tapdata 的数据流中, 当数据要流入到目标数据库之前,Tapdata 会根据这些信息, 在目标库的 dataTypes 中找到最佳的存储字段, 通过 TapField 的 originType 告知给 PDK 开发者, 用以建表。
...
"dataTypes":
"boolean":"bit":8, "unsigned":"", "to":"TapNumber",
"tinyint":"bit":8, "to":"TapNumber",
"smallint":"bit":16, "to":"TapNumber",
"int":"bit":32, "to":"TapNumber",
"bigint":"bit":64, "to":"TapNumber",
"largeint":"bit":128, "to":"TapNumber",
"float":"bit":32, "to":"TapNumber",
"double":"bit":64, "to":"TapNumber",
"decimal[($precision,$scale)]":"bit": 128, "precision": [1, 27], "defaultPrecision": 10, "scale": [0, 9], "defaultScale": 0, "to": "TapNumber",
"date":"byte":3, "range":["0000-01-01", "9999-12-31"], "to":"TapDate",
"datetime":"byte":8, "range":["0000-01-01 00:00:00","9999-12-31 23:59:59"],"to":"TapDateTime",
"char[($byte)]":"byte":255, "to": "TapString", "defaultByte": 1,
"varchar[($byte)]":"byte":"65535", "to":"TapString",
"string":"byte":"2147483643", "to":"TapString",
"HLL":"byte":"16385", "to":"TapNumber", "queryOnly":true
- 编写接入目标数据库的代码
@TapConnectorClass("spec.json")
public class XDBConnector extends ConnectorBase implements TapConnector
@Override
public void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer)
//TODO Load schema from database, connection information in connectionContext#getConnectionConfig
//Sample code shows how to define tables with specified fields
consumer.accept(list(
//Define first table
table("empty-table1")
//Define a field named "id", origin field type, whether is primary key and primary key position
.add(field("id", "varchar").isPrimaryKey(true).partitionKeyPos(1))
.add(field("description", "string"))
.add(field("name", "varchar"))
.add(field("age", "int")))
));
@Override
public void connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer)
//Assume below tests are successfully, below tests are recommended, but not required.
//Connection test
//TODO execute connection test here
consumer.accept(testItem(TestItem.ITEM_CONNECTION, TestItem.RESULT_SUCCESSFULLY));
//Login test
//TODO execute login test here
consumer.accept(testItem(TestItem.ITEM_LOGIN, TestItem.RESULT_SUCCESSFULLY));
//Read test
//TODO execute read test by checking role permission
consumer.accept(testItem(TestItem.ITEM_READ, TestItem.RESULT_SUCCESSFULLY));
//Write test
//TODO execute write test by checking role permission
consumer.accept(testItem(TestItem.ITEM_WRITE, TestItem.RESULT_SUCCESSFULLY));
@Override
public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodecRegistry codecRegistry)
connectorFunctions.supportWriteRecord(this::writeRecord);
connectorFunctions.supportQueryByFilter(this::queryByFilter);
//If database need insert record before table created, then please implement the below two methods.
connectorFunctions.supportCreateTable(this::createTable);
connectorFunctions.supportDropTable(this::dropTable);
//If database need insert record before table created, please implement the custom codec for the TapValue that data types in spec.json didn't cover.
//TapTimeValue, TapMapValue, TapDateValue, TapArrayValue, TapYearValue, TapNumberValue, TapBooleanValue, TapDateTimeValue, TapBinaryValue, TapRawValue, TapStringValue
codecRegistry.registerFromTapValue(TapRawValue.class, "text", tapRawValue ->
if (tapRawValue != null && tapRawValue.getValue() != null)
return toJson(tapRawValue.getValue());
return "null";
);
private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer)
//TODO write records into database
//Below is sample code to print received events which suppose to write to database.
AtomicLong inserted = new AtomicLong(0); //insert count
AtomicLong updated = new AtomicLong(0); //update count
AtomicLong deleted = new AtomicLong(0); //delete count
for(TapRecordEvent recordEvent : tapRecordEvents)
if(recordEvent instanceof TapInsertRecordEvent)
//TODO insert record
inserted.incrementAndGet();
PDKLogger.info(TAG, "Record Write TapInsertRecordEvent ", toJson(recordEvent));
else if(recordEvent instanceof TapUpdateRecordEvent)
//TODO update record
updated.incrementAndGet();
PDKLogger.info(TAG, "Record Write TapUpdateRecordEvent ", toJson(recordEvent));
else if(recordEvent instanceof TapDeleteRecordEvent)
//TODO delete record
deleted.incrementAndGet();
PDKLogger.info(TAG, "Record Write TapDeleteRecordEvent ", toJson(recordEvent));
//Need to tell incremental engine the write result
writeListResultConsumer.accept(writeListResult()
.insertedCount(inserted.get())
.modifiedCount(updated.get())
.removedCount(deleted.get()));
private void queryByFilter(TapConnectorContext connectorContext, List<TapFilter> filters, Consumer<List<FilterResult>> listConsumer)
//Filter is exactly match.
//If query by the filter, no value is in database, please still create a FitlerResult with null value in it. So that flow engine can understand the filter has no value.
private void dropTable(TapConnectorContext connectorContext, TapDropTableEvent dropTableEvent)
TapTable table = connectorContext.getTable();
//TODO implement drop table
private void createTable(TapConnectorContext connectorContext, TapCreateTableEvent createTableEvent)
//TODO implement create table.
TapTable table = connectorContext.getTable();
LinkedHashMap<String, TapField> nameFieldMap = table.getNameFieldMap();
for(Map.Entry<String, TapField> entry : nameFieldMap.entrySet())
TapField field = entry.getValue();
String originType = field.getOriginType();
//originType is the data types defined in spec.json
//TODO use the generated originType to create table.
开发完成之后通过 TDD 进行测试验证
提供 configOptions 里需要用户填写的内容的 json 文件, 例如上述 configOptions 里要求用户填写的是数据库的 Host 和 Port, 那么 tdd 的 xdb_tdd.json 文件内容如下:
"connection":
"host": "192.168.153.132",
"port": 9030,
执行 TDD 测试命令:
./bin/tap tdd --testConfig xdb_tdd.json ./connectors/xdb-connector
当 TDD 测试没有通过, 请根据错误提示修改对应错误,直至通过 TDD测试;
当 TDD 测试通过后, PDK Connector 就处于可以提交 Pull Request 的状态。
如何提交到 PDK 开源项目
① fork idaas-pdk, 基于远程的 main 分支建立本地分支
② 根据要接入数据库名称, 在 idaas-pdk/connectors 目录下新建模块, 命名规范为 数据库小写名称-connector, 例如接入数据库的名称为 XDB, 模块名称为 xdb-connector
③ 开发者根据官方 API 文档完成接入数据库的开发实现
④ 通过 TDD 测试后, 提交 PR 到 idaas-pdk
⑤ 官方团队 Review 提交的 PR 之后合并代码
彩蛋
感兴趣的同学先别急着开发,据了解,官方推出的免费版本,已陆续实现30个常见数据源/目标间的实时数据对接,如果其中已经包含了你想要接入的数据库,完全可以直接使用 Tapdata Cloud(Tapdata Cloud | 免费的异构数据库实时同步云平台 - Tapdata)进行免费数据实时同步。当然,如果你的需求现阶段还不被支持,就可以通过Tapdata PDK 自助开发,快速接入。
Tapdata Cloud 现阶段支持的数据连接类型
目前,Tapdata 开放了面向开发者的插件生态共建群,开发过程中可以提供技术交流和支持,感兴趣的同学可以扫码关注,拉你入群:
CSDN 社区图书馆,开张营业! 深读计划,写书评领图书福利~以上是关于如何利用开源插件?又快又好地搞好数据接口开发,连通不同应用系统的主要内容,如果未能解决你的问题,请参考以下文章