SpringBoot系列之canal和kafka实现异步实时更新

Posted smileNicky

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot系列之canal和kafka实现异步实时更新相关的知识,希望对你有一定的参考价值。

SpringBoot系列之canal和kafka实现异步实时更新

实验开发环境

  • JDK 1.8
  • SpringBoot2.2.1
  • Maven 3.2+
  • 开发工具
    • IntelliJ IDEA
    • smartGit

1、什么是阿里canal?

canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟 mysql slave 的交互协议,伪装自己为 MySQL slave,向master发送dump 协议,获取到数据后,解析 binary log 对象数据。

2、canal环境搭建

本博客基于window系统的,linux系统的可以参考quickstart,比较详细。

使用canal需要确保数据库开启了binlog:

show variables like'log_%';


如果没开启,在mysql my.ini配置文件添加配置,注意文件内存为的时候,注意编码格式必须为ANSI,不然会编译报错

[mysqld]
# 开启 binlog
log-bin=mysql-bin 
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1

配置文件修改是否正确,使用命令,查看日志

mysqld --console

重启MySQL实例

net stop mysql
net start mysql

binlog开启后,创建一个canal用户并授权,官网配置是@%,表示所有服务器,因为本地测试的,所以改为localhost就可以

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' identified by 'canal';
FLUSH PRIVILEGES;

下载canal服务端,到官网releases下载对应资料,canal.deployer-1.1.6-SNAPSHOT是服务端,在conf文件夹里找到\\example\\instance.properties,修改数据库配置信息,dbUsername,dbPassword数据库账号密码


到canal服务器安装目录D:\\canal\\canal.deployer-1.1.6-SNAPSHOT\\bin,找到startup.bat执行

3、kafka环境部署搭建

官网下载链接:https://kafka.apache.org/downloads,最开始用最新版的,发现在我的win10系统没部署成功,所以还是选择2.8.1版本的

在D:\\kafka_2.12-2.8.1\\bin\\windows,使用cmd命令启动zookeeper,window系统修改conf文件夹下面的zookeeper.properties里面的dataDir

zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties


window系统修改conf文件夹下面的log.dirs路径

kafka-server-start.bat ..\\..\\config\\server.properties

4、电商业务场景

使用canal监听mysql数据库里的binlog,一旦修改了order订单表,也就是下单成功,就讲订单数据通过kafka做异步处理,将订单数据同步到仓库系统(kafka消费者)做业务处理,仓库商品的数据更新等业务。

5、创建一个Starter工程

创建一个工程,实现对kafka的api简单封装

jdk选择jdk8的

选择需要的依赖

基于kafka的EventPublisher

package com.example.ebus.publisher;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
public class MyEventPublisher 

    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("$app.ebus.topic:ebus")
    private String topic;

    public MyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate) 
        this.kafkaTemplate = kafkaTemplate;
    

    public void publishEvent(Object event) 
        if (log.isInfoEnabled()) 
            log.info("topic发送:", event.getClass().getName());
        
        kafkaTemplate.send(topic, event);
    



自动配置类

package com.example.ebus.configuration;

import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
public class EbusAutoConfiguration 

    @Bean
    public MyEventPublisher myEventPublisher(@Qualifier("kafkaTemplate") KafkaTemplate<String, Object> kafkaTemplate) 
        return new MyEventPublisher(kafkaTemplate);
    



META-INF/spring.factories,加上配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.example.ebus.configuration.EbusAutoConfiguration

6、消息生产者实现

实现canal进行mysql binlog的监听,然后
新建SpringBoot工程,使用阿里的脚手架,网速比较快

jdk使用1.8的
加上一些其它的配置

在pom文件加上canal客户端的配置:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

加上starter配置:

<dependency>
    <groupId>com.example.ebus</groupId>
    <artifactId>springboot-ebus-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

加上kafka配置:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

canal进行binlog监听,订单表新增数据就发生信息

package com.example.ali.canal.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.ebus.event.ShopOrderEvent;
import com.example.ebus.publisher.MyEventPublisher;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;

@Component
public class AliCanalClient implements ApplicationRunner 

    @Autowired
    private MyEventPublisher eventPublisher;

    private static final int batchSize = 1;

    @Override
    public void run(ApplicationArguments args) throws Exception 
        // 创建canal连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "canal", "canal");
        try 
            // 连接canal服务端
            connector.connect();
            // 只订阅*order的表,订阅所有表:".*\\\\..*"
            connector.subscribe(".*order.*");
            // 回滚到未进行ack确认的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) 
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                // 获取批量ID
                long batchId = message.getId();
                // 获取批量的数量
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) 
                    try 
                        //如果没有数据,线程休眠2秒
                        Thread.sleep(2000);
                     catch (InterruptedException e) 
                    
                 else 
                    //如果有数据,处理数据
                    handle(message.getEntries());
                
                // ack确认batchId。小于等于这个batchId的消息都会被确认
                connector.ack(batchId);
            
         finally 
            // 释放连接
            connector.disconnect();
        
    

    private void handle(List<CanalEntry.Entry> entries) 
        entries.stream().forEach(entry ->
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) 
                // 开启/关闭事务的实体类型,跳过
                return;
            
            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) 
                try 
                    // 获取rowChange
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    // 针对新增操作的监听
                    if (rowChange.getEventType() == CanalEntry.EventType.INSERT) 
                        // 遍历rowChange里的所有的行数据
                        rowChange.getRowDatasList().stream().forEach((row->
                            row.getAfterColumnsList().stream().forEach(column->
                                    publishEvent(row);
                            );
                        ));
                    
                catch (InvalidProtocolBufferException e) 
                    throw new RuntimeException("解释Binlog日志出现异常:" + entry, e);
                
            
        );
    

    private void publishEvent(CanalEntry.RowData rowData) 
        ShopOrderEvent orderEvent = new ShopOrderEvent();
        List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
        columns.forEach((column -> 
            String name = column.getName();
            String value = column.getValue();
            Optional.ofNullable(value).ifPresent((v)->
                if ("orderCode".equals(name)) 
                    orderEvent.setOrderCode(v);
                
                if ("productName".equals(name)) 
                    orderEvent.setProductName(v);
                
                if ("price".equals(name)) 
                    orderEvent.setPrice(Float.valueOf(v));
                
                if ("productDesc".equals(name)) 
                    orderEvent.setProductDesc(v);
                
                if ("isOk".equals(name)) 
                    orderEvent.setIsOk(Integer.valueOf(v));
                
            );
        ));
        eventPublisher.publishEvent(orderEvent);
    



7、kafka消费者端实现

同样创建一个消费者工程,配置:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      enable-auto-commit: true
      group-id: consumer1
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      max-poll-records: 1
      properties:
        spring:
          json:
            trusted:
              packages: '*'


进行监听,使用kafka的KafkaListener

package com.example.consumer.handler;


import com.example.ebus.event.ShopOrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderListenerHandler 

    @KafkaListener(topics = "$app.ebus.topic:ebus")
    public void obtainTopicData(ShopOrderEvent event) 
        log.info("下单成功,orderCode:" , event.getOrderCode());
        // 业务处理
    



ok,测试的可以,新增一些表数据,就可以看到日志打印,本博客代码例子可以在GitHub找到下载链接

8、相关参考资料

以上是关于SpringBoot系列之canal和kafka实现异步实时更新的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot系列之集成kafka实现事件发布

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题

Canal或maxwell实时采集MySQL数据到Kafka