基于GBase8s和Calcite的多数据源查询
Posted 麒思妙想
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于GBase8s和Calcite的多数据源查询相关的知识,希望对你有一定的参考价值。
在本文中,我们将实践 GBase8s
和 mysql
的跨数据源联合查询,案例中 MySQL
数据源中存放商品信息,GBase8s
数据源中存放订单信息。 整体架构如下
好了,我们开始吧。
环境准备
GBase8s
安装镜像 docker pull liaosnet/gbase8s
启动容器 docker run -itd -p 19088:9088 liaosnet/gbase8s
容器基本信息:
JDBC JAR:/home/gbase/gbasedbtjdbc_3.3.0_2.jar
类名:com.gbasedbt.jdbc.Driver
URL:jdbc:gbasedbt-sqli://IPADDR:19088/testdb:GBASEDBTSERVER=gbase01;DB_LOCALE=zh_CN.utf8;CLIENT_LOCALE=zh_CN.utf8;IFX_LOCK_MODE_WAIT=30;
用户:gbasedbt
密码:GBase123
其中:IPADDR为docker所在机器的IP地址,同时需要放通19088端口。
MySQL
安装镜像 docker pull liaosnet/gbase8s
启动容器 docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=dafei1288 -d mysql
数据准备
GBase8s
CREATE TABLE order_table (
oid INTEGER NOT NULL,
iid INTEGER,
icount INTEGER,
PRIMARY KEY (oid) CONSTRAINT order_table_pk
);
INSERT INTO order_table (oid, iid, icount) VALUES(1, 1, 10);
INSERT INTO order_table (oid, iid, icount) VALUES(2, 3, 30);
MySQL
create table item
(
i_id int auto_increment
primary key,
catalog varchar(20) null,
pname varchar(20) null,
price float null,
constraint item_i_id_uindex
unique (i_id)
);
INSERT INTO test.item (i_id, catalog, pname, price) VALUES (1, '游戏', '大航海时代IV', 300);
INSERT INTO test.item (i_id, catalog, pname, price) VALUES (2, '游戏', '马车8', 300);
INSERT INTO test.item (i_id, catalog, pname, price) VALUES (3, '食品', '青椒豆腐乳西瓜', 20);
工程准备
创建 maven
工程,目录如下图所示:
添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>calcite_multi_database</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.29.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>gbase</groupId>
<artifactId>gbasedbt</artifactId>
<version>330</version>
<scope>system</scope>
<systemPath>$project.basedir/libs/gbasedbtjdbc_3.3.0_2_36477d.jar</systemPath>
</dependency>
</dependencies>
</project>
添加数据源配置文件 multiSource.json
"defaultSchema": "gbasedbt",
"schemas": [
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"name": "mysql",
"operand":
"jdbcDriver": "com.mysql.cj.jdbc.Driver",
"jdbcUrl": "jdbc:mysql://localhost:3306/test",
"jdbcUser": "root",
"jdbcPassword": "dafei1288"
,
"type": "custom"
,
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"name": "gbasedbt",
"operand":
"jdbcDriver": "com.gbasedbt.jdbc.Driver",
"jdbcUrl": "jdbc:gbasedbt-sqli://localhost:19088/testdb:GBASEDBTSERVER=gbase01;DB_LOCALE=zh_CN.utf8;CLIENT_LOCALE=zh_CN.utf8;IFX_LOCK_MODE_WAIT=30;",
"jdbcUser": "gbasedbt",
"jdbcPassword": "GBase123"
,
"type": "custom"
],
"version": "1.0"
创建执行程序 MultiSource
使用上面的配置文件,创建 Calcite Jdbc
链接
String filepath = "E:\\\\working\\\\GBase\\\\writting\\\\calcite_multi_database_select\\\\calcite_multi_database\\\\src\\\\main\\\\resources\\\\multiSource.json";
Properties config = new Properties();
config.put("model",filepath);
config.put("lex", "MYSQL");
这里 config.put("lex", "MYSQL");
用于解析外层 SQL
,所以必须保留。使用查询语句 SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id
进行数据查询。
除了执行结果,其实我们也会对执行的逻辑计划感兴趣,那么我们来看看如何将该 SQL
的逻辑计划打印出来
public static void printLogicPlan(String modelPath , String sql) throws Exception
String modelJsonStr = Files.readAllLines(Paths.get(modelPath)).stream().collect(Collectors.joining("\\n"));
HashMap map = new Gson().fromJson(modelJsonStr, HashMap.class);
List<Map> schemas = (List<Map>) map.get("schemas");
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
Schema gbasedbt = JdbcSchema.create(rootSchema, "gbasedbt" , (Map<String,Object>)schemas.get(1).get("operand"));
Schema mysql = JdbcSchema.create(rootSchema, "mysql" , (Map<String,Object>)schemas.get(0).get("operand"));
rootSchema.add("gbasedbt",gbasedbt);
rootSchema.add("mysql",mysql);
SqlParser.Config insensitiveParser = SqlParser.configBuilder()
.setCaseSensitive(false)
.build();
FrameworkConfig config = Frameworks.newConfigBuilder()
.parserConfig(insensitiveParser)
.defaultSchema(rootSchema)
.build();
Planner planner = Frameworks.getPlanner(config);
SqlNode sqlNode = planner.parse(sql);
SqlNode sqlNodeValidated = planner.validate(sqlNode);
RelRoot relRoot = planner.rel(sqlNodeValidated);
RelNode relNode = relRoot.project();
System.out.println(sqlNode.toSqlString(MysqlSqlDialect.DEFAULT));
System.out.println();
System.out.println(relNode.explain());
下面是逻辑计划打印的结果,我们不难看出,这里是使用了2个全表扫描,然后再通过 Join
算子,然后进行 project
算子的。其实这个执行不能说效率很高吧,只能说非常慢,如果想做优化,我们以后再开一篇文章。
LogicalProject(OID=[$0], IID=[$1], ICOUNT=[$2], CATALOG=[$4], PNAME=[$5], PRICE=[$6])
LogicalJoin(condition=[=($1, $3)], joinType=[inner])
JdbcTableScan(table=[[gbasedbt, order_table]])
JdbcTableScan(table=[[mysql, item]])
SQL
查询结果如下
oid iid icount catalog pname price
1 1 10 游戏 大航海时代IV 300.0
2 3 30 食品 青椒豆腐乳西瓜 20.0
执行截图
完整代码清单:
package wang.datahub;
import com.google.gson.Gson;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.externalize.RelWriterImpl;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
public class MultiSource
public static void main(String[] args) throws Exception
String filepath = "E:\\\\working\\\\GBase\\\\writting\\\\calcite_multi_database_select\\\\calcite_multi_database\\\\src\\\\main\\\\resources\\\\multiSource.json";
Properties config = new Properties();
config.put("model",filepath);
config.put("lex", "MYSQL");
String sql =
"SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id";
try (Connection con = DriverManager.getConnection("jdbc:calcite:", config))
try (Statement stmt = con.createStatement())
try (ResultSet rs = stmt.executeQuery(sql))
//打印逻辑计划
printLogicPlan(filepath,sql);
//打印查询结果
printRs(rs);
public static void printRs(ResultSet rs) throws Exception
ResultSetMetaData rsmd = rs.getMetaData();
int count = rsmd.getColumnCount();
for(int i = 1; i <= count; i++)
System.out.print(rsmd.getColumnName(i)+"\\t");
System.out.println();
while(rs.next())
for(int i = 1; i <= count; i++)
System.out.print(rs.getString(i)+"\\t");
System.out.println();
public static void printLogicPlan(String modelPath , String sql) throws Exception
String modelJsonStr = Files.readAllLines(Paths.get(modelPath)).stream().collect(Collectors.joining("\\n"));
HashMap map = new Gson().fromJson(modelJsonStr, HashMap.class);
List<Map> schemas = (List<Map>) map.get("schemas");
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
Schema gbasedbt = JdbcSchema.create(rootSchema, "gbasedbt" , (Map<String,Object>)schemas.get(1).get("operand"));
Schema mysql = JdbcSchema.create(rootSchema, "mysql" , (Map<String,Object>)schemas.get(0).get("operand"));
rootSchema.add("gbasedbt",gbasedbt);
rootSchema.add("mysql",mysql);
SqlParser.Config insensitiveParser = SqlParser.configBuilder()
.setCaseSensitive(false)
.build();
FrameworkConfig config = Frameworks.newConfigBuilder()
.parserConfig(insensitiveParser)
.defaultSchema(rootSchema)
.build();
Planner planner = Frameworks.getPlanner(config);
SqlNode sqlNode = planner.parse(sql);
SqlNode sqlNodeValidated = planner.validate(sqlNode);
RelRoot relRoot = planner.rel(sqlNodeValidated);
RelNode relNode = relRoot.project();
System.out.println(sqlNode.toSqlString(MysqlSqlDialect.DEFAULT));
System.out.println();
System.out.println(relNode.explain());
好了,现在我们初步完成了基于 GBase8s
的跨数据源查询工作,下一篇文章我们来说说查询优化。
提前祝广大读者朋友,新春快乐,虎虎生威 …
以上是关于基于GBase8s和Calcite的多数据源查询的主要内容,如果未能解决你的问题,请参考以下文章
手把手构建基于 GBase8s 的 Flink connector