基于Calcite的分布式多数据源查询

Posted 麒思妙想

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Calcite的分布式多数据源查询相关的知识,希望对你有一定的参考价值。

在本文中,我们将实践 GBase8smysql 的跨数据源联合查询,案例中 MySQL 数据源中存放商品信息,GBase8s 数据源中存放订单信息。整体架构如下

好了,我们开始吧。

创建 maven 工程,目录如下图所示:

链接用于解析外层 SQL ,所以必须保留。使用查询语句 SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id 进行数据查询。

除了执行结果,其实我们也会对执行的逻辑计划感兴趣,那么我们来看看如何将该 SQL 的逻辑计划打印出来

算子,然后进行 project算子的。其实这个执行不能说效率很高吧,只能说非常慢,如果想做优化,我们以后再开一篇文章。

查询结果如下

的跨数据源查询工作,下一篇文章我们来说说查询优化。提前祝广大读者朋友,新春快乐,虎虎生威 ...

历史文章导读

  • 如何用Flink整合hudi,构架沧湖一体化解决方案

  • 云原生初体验:在k8s上部署springboot应用

  • 手动实现一门图灵完备的编程语言——Brainfuck

  • 10分钟入门响应式:Springboot整合kafka实现reactive

  • 基于Calcite自定义SQL解析器

  • 基于JDBC实现VPD:SQL解析篇

  • 如何成为一个成功的首席数据官

  • 基于Win10单机部署kubernetes应用

  • 浅谈基于JDBC实现虚拟专用数据库(VPD)


  • 你好,我是 +7 ,一个大数据领域的硬核原创作者。

    做过后端架构、数据库中间件、数据平台&架构、产品。

    专注大数据领域,数据库领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。

    如果文件对您有点帮助,请关注、分享,帮助更多人~非常感谢

    calcite mysql_calcite简单入门

    1 介绍

    Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。

    Calcite 之前的名称叫做optiq,optiq 起初在 Hive 项目中,为 Hive 提供基于成本模型的优化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来,成为 Apache 社区的孵化项目,2014 年 9 月正式更名为 Calcite。

    Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。

    2 架构与解析步骤

    一般来说Calcite解析SQL有以下几步:

    Parser. 此步中Calcite通过Java CC将SQL解析成未经校验的AST

    Validate. 该步骤主要作用是校证Parser步骤中的AST是否合法,如验证SQL scheme、字段、函数等是否存在; SQL语句是否合法等. 此步完成之后就生成了RelNode树(关于RelNode树, 请参考下文)

    Optimize. 该步骤主要的作用优化RelNode树, 并将其转化成物理执行计划。主要涉及SQL规则优化如:基于规则优化(RBO)及基于代价(CBO)优化; Optimze 这一步原则上来说是可选的, 通过Validate后的RelNode树已经可以直接转化物理执行计划,但现代的SQL解析器基本上都包括有这一步,目的是优化SQL执行计划。此步得到的结果为物理执行计划。

    Execute,即执行阶段。此阶段主要做的是:将物理执行计划转化成可在特定的平台执行的程序。如Hive与Flink都在在此阶段将物理执行计划CodeGen生成相应的可执行代码。

    2.1 查询优化

    INSERT INTO tmp_node

    SELECT s1.id1, s1.id2, s2.val1

    FROM source1 as s1 INNER JOIN source2 AS s2

    ON s1.id1 = s2.id1 and s1.id2 = s2.id2 where s1.val1 > 5 and s2.val2 = 3;

    2.2 Parser解析

    LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])

    LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])

    LogicalFilter(condition=[AND(>($2, 5), =($8, 3))])

    LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[INNER])

    LogicalTableScan(table=[[SOURCE1]])

    LogicalTableScan(table=[[SOURCE2]])

    2.3 Optimize优化

    谓词下推,投影下推,关系代数定律优化

    LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])

    LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])

    LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])

    LogicalFilter(condition=[=($4, 3)])

    LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])

    LogicalTableScan(table=[[SOURCE1]])

    LogicalFilter(condition=[>($3,5)])

    LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])

    LogicalTableScan(table=[[SOURCE2]])

    3 LogicalTableScan查询

    如上,节点树中的最后节点均为LogicalTableScan,假设我们不参与(LogicalTableScan)Calcite的查询过程,即不做SQL解析,不做优化,只要把它接入进来,实际Calcite是可以工作的,无非就是可能会有扫全表、数据全部加载到内存里等问题,所以实际中我们可能会参与全部(Translatable)或部分工作(FilterableTable),覆盖Calcite的一些执行计划或过滤条件,让它能更高效的工作。

    值得一提的是,Calcite支持异构数据源查询,比如数据存在es和mysql,可以通过写sql join之类的操作,让calcite分别先从不同的数据源查询数据,然后再在内存里进行合并计算;另外,它本身提供了许多优化规则,也支持我们自定义优化规则,来优化整个查询。

    3.1 ScannableTable

    a simple implementation of Table, using the ScannableTable interface, that enumerates all rows directly

    这种方式基本不会用,原因是查询数据库的时候没有任何条件限制,默认会先把全部数据拉到内存,然后再根据filter条件在内存中过滤。

    使用方式:实现Enumerable scan(DataContext root);,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据。

    3.2 FilterableTable

    a more advanced implementation that implements FilterableTable, and can filter out rows according to simple predicates

    初级用法,我们能拿到filter条件,即能再查询底层DB时进行一部分的数据过滤,一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。

    使用方式:实现Enumerable scan(DataContext root, List filters )。

    如果当前类型的“表”能够支持我们自己写代码优化这个过滤器,那么执行完自定义优化器,可以把该过滤条件从集合中移除,否则,就让calcite来过滤,简言之就是,如果我们不处理List filters ,Calcite也会根据自己的规则在内存中过滤,无非就是对于查询引擎来说查的数据多了,但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter),这样在查的时候引擎本身就能先过滤掉多余数据,更加优化。提示,即使走了我们的查询过滤条件,可以再让calcite帮我们过滤一次,比较灵活。

    3.3 TranslatableTable

    advanced implementation of Table, using TranslatableTable, that translates to relational operators using planner rules.

    高阶用法,有些查询用上面的方式都支持不了或支持的不好,比如join、聚合、或对于select的字段筛选等,需要用这种方式来支持,好处是可以支持更全的功能,代价是所有的解析都要自己写,“承上启下”,上面解析sql的各个部件,下面要根据不同的DB(es\\mysql\\drudi..)来写不同的语法查询。

    当使用ScannableTable的时候,我们只需要实现函数Enumerable scan(DataContext root);,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据,我们干涉不了任何执行过程);当使用FilterableTable的时候,我们需要实现函数Enumerable scan(DataContext root, List filters );参数中多了filters数组,这个数据包含了针对这个表的过滤条件,这样我们根据过滤条件只返回过滤之后的行,减少上层进行其它运算的数据集;当使用TranslatableTable的时候,我们需要实现RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable);,该函数可以让我们根据上下文自己定义表扫描的物理执行计划,至于为什么不在返回一个Enumerable对象了,因为上面两种其实使用的是默认的执行计划,转换成EnumerableTableAccessRel算子,通过TranslatableTable我们可以实现自定义的算子,以及执行一些其他的rule,Kylin就是使用这个类型的Table实现查询。

    4 自定义数据源表接入demo

    如果你的数据源不在官方的支持列表中,或者官方的支持不能满足你的需求,那么则需要自己实现源接入。

    4.1 准备工作

    4.1.1 maven引入

    org.apache.calcite

    calcite-core

    1.19.0

    com.alibaba

    fastjson

    1.2.54

    com.google.guava

    guava

    16.0.1

    4.1.2 开发流程

    calcite中,引入一个数据库通常是通过注册一个SchemaFactory接口实现类来实现。SchemaFactory中只有一个方法,就是生成Schema。Schema最重要的功能是获取所有Table。Table有两个功能,一个是获取所有字段的类型,另一个是得到Enumerable迭代器用来读取数据。

    4.1.3 配置信息

    如果将你的数据源引入calcite,一般情况下是使用一个配置文件,以下是配置文件的demo。

    "version": "1.0",

    "defaultSchema": "TEST",

    "schemas": [

    "name": "TEST",

    "type": "custom",

    "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",

    "operand":

    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8",

    "jdbcDriver":"com.mysql.cj.jdbc.Driver",

    "jdbcUser":"test",

    "jdbcPassword":"test"

    ]

    4.2 CSV表demo

    这里我们先生成一个CSV文件,后边的操作就是通过在calcite中调用SQL访问CSV中的数据。

    TEST01.csv

    ID:VARCHAR,NAME1:VARCHAR,NAME2:VARCHAR

    0,first,second

    1,hello,world

    CsvSchemaFactory类

    package com.calcite.csv;

    import org.apache.calcite.schema.Schema;

    import org.apache.calcite.schema.SchemaFactory;

    import org.apache.calcite.schema.SchemaPlus;

    import java.util.Map;

    public class CsvSchemaFactory implements SchemaFactory

    /**

    * parentSchema 他的父节点,一般为root

    * name 数据库的名字,它在model中定义的

    * operand 也是在mode中定义的,是Map类型,用于传入自定义参数。

    * */

    @Override

    public Schema create(SchemaPlus parentSchema, String name, Map operand)

    return new CsvSchema(String.valueOf(operand.get("dataFile")));

    CsvSchema类

    package com.calcite.csv;

    import com.google.common.collect.ImmutableMap;

    import com.google.common.io.Resources;

    import org.apache.calcite.schema.Table;

    import org.apache.calcite.schema.impl.AbstractSchema;

    import org.apache.calcite.util.Source;

    import org.apache.calcite.util.Sources;

    import java.net.URL;

    import java.util.Map;

    public class CsvSchema extends AbstractSchema

    private Map tableMap;

    private String dataFile;

    public CsvSchema(String dataFile)

    this.dataFile = dataFile;

    @Override

    protected Map getTableMap()

    URL url = Resources.getResource(dataFile);

    Source source = Sources.of(url);

    if (tableMap == null)

    final ImmutableMap.Builder builder = ImmutableMap.builder();

    builder.put(this.dataFile.split("\\\\.")[0],new CsvTable(source));

    // 一个数据库有多个表名,这里初始化,大小写要注意了,TEST01是表名。

    tableMap = builder.build();

    return tableMap;

    CsvTable类

    package com.calcite.csv;

    import com.google.common.collect.Lists;

    import org.apache.calcite.DataContext;

    import org.apache.calcite.adapter.java.JavaTypeFactory;

    import org.apache.calcite.linq4j.AbstractEnumerable;

    import org.apache.calcite.linq4j.Enumerable;

    import org.apache.calcite.linq4j.Enumerator;

    import org.apache.calcite.rel.type.RelDataType;

    import org.apache.calcite.rel.type.RelDataTypeFactory;

    import org.apache.calcite.schema.ScannableTable;

    import org.apache.calcite.schema.impl.AbstractTable;

    import org.apache.calcite.sql.type.SqlTypeName;

    import org.apache.calcite.util.Pair;

    import org.apache.calcite.util.Source;

    import java.io.*;

    import java.util.List;

    public class CsvTable extends AbstractTable implements ScannableTable

    private Source source;

    public CsvTable(Source source)

    this.source = source;

    /**

    * 获取字段类型

    */

    @Override

    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory)

    JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;

    List names = Lists.newLinkedList();

    List types = Lists.newLinkedList();

    try

    BufferedReader reader = new BufferedReader(new FileReader(source.file()));

    String line = reader.readLine();

    List lines = Lists.newArrayList(line.split(","));

    lines.forEach(column ->

    String name = column.split(":")[0];

    String type = column.split(":")[1];

    names.add(name);

    types.add(typeFactory.createSqlType(SqlTypeName.get(type)));

    );

    catch (FileNotFoundException e)

    e.printStackTrace();

    catch (IOException e)

    e.printStackTrace();

    return typeFactory.createStructType(Pair.zip(names, types));

    @Override

    public Enumerable scan(DataContext dataContext)

    return new AbstractEnumerable()

    @Override

    public Enumerator enumerator()

    return new CsvEnumerator<>(source);

    ;

    CsvEnumerator类

    package com.calcite.csv;

    import org.apache.calcite.linq4j.Enumerator;

    import org.apache.calcite.util.Source;

    import java.io.BufferedReader;

    import java.io.IOException;

    public class CsvEnumerator implements Enumerator

    private E current;

    private BufferedReader br;

    public CsvEnumerator(Source source)

    try

    this.br = new BufferedReader(source.reader());

    this.br.readLine();

    catch (IOException e)

    e.printStackTrace();

    @Override

    public E current()

    return current;

    @Override

    public boolean moveNext()

    try

    String line = br.readLine();

    if(line == null)

    return false;

    current = (E)line.split(","); // 如果是多列,这里要多个值

    catch (IOException e)

    e.printStackTrace();

    return false;

    return true;

    /**

    * 出现异常走这里

    * */

    @Override

    public void reset()

    System.out.println("报错了兄弟,不支持此操作");

    /**

    * InputStream流在这里关闭

    * */

    @Override

    public void close()

    model.json

    "version": "1.0",

    "defaultSchema": "TEST_CSV",

    "schemas": [

    "name": "TEST_CSV",

    "type": "custom",

    "factory": "com.calcite.csv.CsvSchemaFactory",

    "operand":

    "dataFile": "TEST01.csv"

    ]

    Main方法调用

    package com.calcite;

    import com.alibaba.fastjson.JSON;

    import com.calcite.util.ReourceUtil;

    import com.google.common.collect.Lists;

    import com.google.common.collect.Maps;

    import java.sql.*;

    import java.util.List;

    import java.util.Map;

    public class Client

    /**

    * 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory

    * 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。

    * operand 动态参数,ScheamFactory的create方法会接收到这里的数据

    */

    public static void main(String[] args)

    try

    // 用文件的方式

    //URL url = Client.class.getResource("/model.json");

    //String str = URLDecoder.decode(url.toString(), "UTF-8");

    //Properties info = new Properties();

    //info.put("model", str.replace("file:", ""));

    //Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

    // 字符串方式

    String model = ReourceUtil.getResourceAsString("model.json");

    Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);

    Statement statement = connection.createStatement();

    test1(statement);

    catch (Exception e)

    e.printStackTrace();

    /**

    * CSV文件读取

    * @param statement

    * @throws Exception

    */

    public static void test1(Statement statement) throws Exception

    ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");

    System.out.println(JSON.toJSONString(getData(resultSet)));

    public static List> getData(ResultSet resultSet)throws Exception

    List> list = Lists.newArrayList();

    ResultSetMetaData metaData = resultSet.getMetaData();

    int columnSize = metaData.getColumnCount();

    while (resultSet.next())

    Map map = Maps.newLinkedHashMap();

    for (int i = 1; i < columnSize + 1; i++)

    map.put(metaData.getColumnLabel(i), resultSet.getObject(i));

    list.add(map);

    return list;

    4.3 内存数据源与CSV数据源关联查询demo

    在4.2的演示中,我们能够使用SQL查询CSV文件中的数据。接下来,我们再定义一种内存数据源,主要作用是演示两种数据源间的关联查询。

    MemSchemaFactory类

    package com.calcite.memory;

    import org.apache.calcite.schema.Schema;

    import org.apache.calcite.schema.SchemaFactory;

    import org.apache.calcite.schema.SchemaPlus;

    import java.util.Map;

    public class MemSchemaFactory implements SchemaFactory

    @Override

    public Schema create(SchemaPlus schemaPlus, String s, Map map)

    return new MemSchema(map);

    MemSchema类

    package com.calcite.memory;

    import com.google.common.collect.ImmutableMap;

    import org.apache.calcite.schema.Table;

    import org.apache.calcite.schema.impl.AbstractSchema;

    import java.util.Map;

    public class MemSchema extends AbstractSchema

    private Map map;

    private Map tableMap;

    public MemSchema(Map map)

    this.map = map;

    @Override

    protected Map getTableMap()

    if (tableMap == null)

    final ImmutableMap.Builder builder = ImmutableMap.builder();

    map.forEach((key, value) ->

    builder.put(key, new MemTable(value));

    );

    tableMap = builder.build();

    return tableMap;

    MemTable类

    package com.calcite.memory;

    import com.alibaba.fastjson.JSON;

    import com.alibaba.fastjson.TypeReference;

    import com.alibaba.fastjson.parser.Feature;

    import com.google.common.collect.Lists;

    import org.apache.calcite.DataContext;

    import org.apache.calcite.adapter.java.JavaTypeFactory;

    import org.apache.calcite.linq4j.AbstractEnumerable;

    import org.apache.calcite.linq4j.Enumerable;

    import org.apache.calcite.linq4j.Enumerator;

    import org.apache.calcite.rel.type.RelDataType;

    import org.apache.calcite.rel.type.RelDataTypeFactory;

    import org.apache.calcite.schema.ScannableTable;

    import org.apache.calcite.schema.impl.AbstractTable;

    import org.apache.calcite.sql.type.SqlTypeName;

    import org.apache.calcite.util.Pair;

    import java.io.BufferedReader;

    import java.io.FileNotFoundException;

    import java.io.FileReader;

    import java.io.IOException;

    import java.util.List;

    import java.util.Map;

    public class MemTable extends AbstractTable implements ScannableTable

    private List> list = Lists.newLinkedList();

    public MemTable(Object list)

    if (list instanceof List)

    ((List)list).forEach(o ->

    this.list.add(

    JSON.parseObject(JSON.toJSONString(o),

    new TypeReference>() ,

    Feature.OrderedField));

    );

    @Override

    public Enumerable scan(DataContext dataContext)

    return new AbstractEnumerable()

    @Override

    public Enumerator enumerator()

    return new MemEnumerator(list);

    ;

    @Override

    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory)

    JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;

    List names = Lists.newLinkedList();

    List types = Lists.newLinkedList();

    if (list.size() != 0)

    list.get(0).forEach((key, value) ->

    names.add(key);

    types.add(typeFactory.createSqlType(SqlTypeName.get("VARCHAR")));

    );

    return typeFactory.createStructType(Pair.zip(names, types));

    MemEnumerator类

    package com.calcite.memory;

    import com.google.common.collect.Lists;

    import org.apache.calcite.linq4j.Enumerator;

    import java.util.List;

    import java.util.Map;

    public class MemEnumerator implements Enumerator

    private List> list = Lists.newLinkedList();

    private int index = -1;

    private E e;

    public MemEnumerator(List> list)

    this.list = list;

    @Override

    public E current()

    return e;

    @Override

    public boolean moveNext()

    if (index+1 >= list.size())

    return false;

    else

    e = (E)list.get(index+1).values().toArray();

    index++;

    return true;

    @Override

    public void reset()

    index = -1;

    e = null;

    @Override

    public void close()

    model.json

    "version": "1.0",

    "defaultSchema": "TEST_CSV",

    "schemas": [

    "name": "TEST_CSV",

    "type": "custom",

    "factory": "com.calcite.csv.CsvSchemaFactory",

    "operand":

    "dataFile": "TEST01.csv"

    ,

    "name": "TEST_MEM",

    "type": "custom",

    "factory": "com.calcite.memory.MemSchemaFactory",

    "operand":

    "MEM_TABLE_1": [

    "ID": 0,

    "MEM_STR": "str0"

    ,

    "ID": 1,

    "MEM_STR": "str1"

    ,

    "ID": 2,

    "MEM_STR": "str2"

    ]

    ]

    Main方法调用

    package com.calcite;

    import com.alibaba.fastjson.JSON;

    import com.calcite.util.ReourceUtil;

    import com.google.common.collect.Lists;

    import com.google.common.collect.Maps;

    import java.sql.*;

    import java.util.List;

    import java.util.Map;

    public class Client

    /**

    * 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory

    * 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。

    * operand 动态参数,ScheamFactory的create方法会接收到这里的数据

    */

    public static void main(String[] args)

    try

    // 用文件的方式

    //URL url = Client.class.getResource("/model.json");

    //String str = URLDecoder.decode(url.toString(), "UTF-8");

    //Properties info = new Properties();

    //info.put("model", str.replace("file:", ""));

    //Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

    // 字符串方式

    String model = ReourceUtil.getResourceAsString("model.json");

    Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);

    Statement statement = connection.createStatement();

    test2(statement);

    catch (Exception e)

    e.printStackTrace();

    /**

    * CSV文件读取

    * @param statement

    * @throws Exception

    */

    public static void test1(Statement statement) throws Exception

    ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");

    System.out.println(JSON.toJSONString(getData(resultSet)));

    /**

    * CSV文件与内存文件关联读取

    * @param statement

    * @throws Exception

    */

    public static void test2(Statement statement) throws Exception

    ResultSet resultSet1 = statement.executeQuery("select csv1.id as cid,csv1.name1 as cname ,mem1.id as mid,mem1.mem_str as mstr from test_csv.TEST01 as csv1 left join test_mem.mem_table_1 as mem1 on csv1.id = mem1.id");

    System.out.println(JSON.toJSONString(getData(resultSet1)));

    public static List> getData(ResultSet resultSet)throws Exception

    List> list = Lists.newArrayList();

    ResultSetMetaData metaData = resultSet.getMetaData();

    int columnSize = metaData.getColumnCount();

    while (resultSet.next())

    Map map = Maps.newLinkedHashMap();

    for (int i = 1; i < columnSize + 1; i++)

    map.put(metaData.getColumnLabel(i), resultSet.getObject(i));

    list.add(map);

    return list;

    小结

    calcite对于没有高并发、低延时的多数据源间数据有着天然的优势。但需要注意的是,如果一个表中数据量特别大,大到读取速度很慢或内存无法容纳,那么务必在操作该表数据时加入尽可能多的筛选条件,如果自定义实现LogicalTableScan,最好也是实现FilterableTable,从而减少calcite在内存中操作数据行的量。

    2022深度学习开发者峰会 5月20日13:00让我们相聚云端,共襄盛会!

    以上是关于基于Calcite的分布式多数据源查询的主要内容,如果未能解决你的问题,请参考以下文章

    基于GBase8s和Calcite的多数据源查询

    Apache Calcite 学习

    calcite mysql_calcite简单入门

    看这篇就够了丨基于Calcite框架的SQL语法扩展探索

    95-910-330-源码-FlinkSQL-Calcite-Flink结合Calcite

    SQL 数据库查询的优化工具及实用