canal同步mysql数据至es5.5.0

Posted zhangdapao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal同步mysql数据至es5.5.0相关的知识,希望对你有一定的参考价值。

系统环境:

  • canal-1.1.4
  • es 5.5.0
  • transport方式连接es

各项配置可以直接参考canal官方文档,由于1.1.4支持的es版本为6.x以上,其他版本需要替换依赖重新编译client-adapter.elasticsearch模块,以下为es5.5.0低版本兼容方案以及个人踩的坑。

依赖修改:

修改client-adapter模块的pom.xml,将es的依赖修改为es版本适配的5.5.0。

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>6.4.3</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.4.3</version>
</dependency>

由于5.5.0版本无rest-client,因此只修改transport相关版本,后续仅测试tcp连接es同步,rest不确定兼容性。

代码兼容:

ESConnection.java:


transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
                    Integer.parseInt(host.substring(i + 1))));


修改为
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
                    Integer.parseInt(host.substring(i + 1))));

开始编译

mvn clean install -Dmaven.test.skip -Denv=release

rest兼容问题

问题1

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure: Compilation failure:
[ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未报告的异常错误java.io.IOException; 必须对其进行捕获或声明以便抛出
[ERROR] canal/client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RestHighLevelClientExt.java:[24,13] 方法引用无效

5.x版本的transportclient不兼容rest-client,注释掉rest导致的异常。

RestHighLevelClientExt::getMapping
    @Deprecated
    public static GetMappingsResponse getMapping(RestHighLevelClient restHighLevelClient,
                                                 GetMappingsRequest getMappingsRequest,
                                                 RequestOptions options) throws IOException,IllegalAccessException {
        throw new IllegalAccessException("es 5.x unsupport this method, use tcp mode");
    }


ESConnection::getMapping
        ...
        if (mode == ESClientMode.TRANSPORT) {
            ...
        } else {
            try {
                GetMappingsRequest request = new GetMappingsRequest();
                request.indices(index);
                GetMappingsResponse response;
                // try {
                // response = restHighLevelClient
                // .indices()
                // .getMapping(request, RequestOptions.DEFAULT);
                // // 6.4以下版本直接使用该接口会报错
                // } catch (Exception e) {
                // logger.warn("Low ElasticSearch version for getMapping");
                response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                // }


                mappings = response.mappings();
            } catch (NullPointerException e) {
                throw new IllegalArgumentException("Not found the mapping info of index: " + index);
            } catch (IOException | IllegalAccessException e) {//此处增加一个异常捕获
                logger.error(e.getMessage(), e);
                return null;
            }
            ...
        }

问题2

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure
[ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未报告的异常错误java.io.IOException; 必须对其进行捕获或声明以便抛出

原因如下,getSourceAsMap方法在6.4.3抛出runtimeException(ElasticsearchParseException是子类),而5.5.0版本抛出IOException,需要显示捕获。

//6.4.3抛出的异常时runtimeException
public Map<String, Object> getSourceAsMap() throws ElasticsearchParseException {
    return this.sourceAsMap();
}
//5.5.0版本
public Map<String, Object> getSourceAsMap() throws IOException {
    return sourceAsMap();
}

修改ESTemplate的getEsType方法捕获异常即可

ESTemplate::getEsType


Map<String, Object> sourceMap = null;
try{
    sourceMap = mappingMetaData.getSourceAsMap();
}catch (IOException e){
    logger.error(e.getMessage(), e);
    return null;
}

编译后,替换canal.adapter-1.1.4\\plugin下的 client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 文件。

执行deploy和adapter启动脚本即可。

配置问题

启动后报错:

2020-07-07 14:36:08.223 [main] INFO  org.elasticsearch.plugins.PluginsService - loaded plugin [org.elasticsearch.transport.Netty4Plugin]
2020-07-07 14:36:08.473 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es failed
java.lang.RuntimeException: java.lang.IllegalArgumentException: unknown setting [mode] please check that any required plugins are installed, or check the breaking changes documentation for removed settings
        at com.alibaba.otter.canal.client.adapter.es.ESAdapter.init(ESAdapter.java:137)
        at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:172)

查看canal源码,未发现抛出异常日志的代码,再搜索依赖的包,发现异常是es创建transportClient时抛出的异常,于是猜测是canal-adpapter配置中的某个mode参数被引入创建transportClient的setting中导致创建失败,于是注释掉,并重启。

      - name: es
        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          #mode: transport # transport # or rest   //注释了这行,是1.1.4的坑,代码中properties下的所有配置都会被传入transportClient的setting中,rest模式则不会,所以transport模式除了cluster.name外的配置会导致es连接创建失败
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

测试:

重启后,向mysql插入数据后,adapter打印出日志

[pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":21,"name":"测试用户","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777991,"type":"INSERT"}
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Destination: example_instance, database:canal, table:class, type:INSERT, affected index count: 1
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Prepared to sync index: canal_test, destination: example_instance
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Single table insert to es index, destination:example_instance, table: class, index: canal_test, id: 21
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 1 ms,destination: example_instance, es index: canal_test
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync completed: canal_test, destination: example_instance
[pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 2 ms, affected indexes count:1, destination: example_instance
[pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":21,"name":"测试用户","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777993,"type":"INSERT"}
Affected indexes: canal_test

查看es数据

curl 127.0.0.1:9200/canal_test/canal/21


{
"_index": "canal_test",
"_type": "canal",
"_id": "21",
"_version": 1,
"found": true,
"_source": {
"name": "测试用户"
}
}

小结:

  • canal-adapter不支持索引名,若有频繁全量构建需求则不适用该方案
  • 更新时查询不支持非数字类型主键(拼接SQL字符串导致)
  • 表的更新都会同步至es,一对多关联时,记录变更可能会触发索引批量更新,索引若存储快照数据则建议监听变更开发带业务逻辑的adapter

相关文档:

MySQL实时同步到Elasticsearch实现方案
canal官方文档

mysql启动binlog

canal增量同步mysql信息到ES

以上是关于canal同步mysql数据至es5.5.0的主要内容,如果未能解决你的问题,请参考以下文章

Canal实时同步MySQL数据至Kafka集群,安装部署

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

canal实现同步mysql至es