ChunJun-JDBC轮询增量更新-源码分析

Posted 蒋含竹

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ChunJun-JDBC轮询增量更新-源码分析相关的知识,希望对你有一定的参考价值。

ChunJun-JDBC轮询增量更新-源码分析

  • 版本 ChunJun 1.12

断点续传字段与增量更新字段

  • 在“ChunJun任务提交-源码分析”中,我们可以看到:
    1. 在执行Flink应用的main方法时,会先构建Flink的数据源DataStream<RowData>
    2. 构建数据源,则是调用了JdbcSourceFactory中的createSource()方法
package com.dtstack.chunjun.connector.jdbc.source;

// import ...

public abstract class JdbcSourceFactory extends SourceFactory 

    // code...

    @Override
    public DataStream<RowData> createSource() 
        initColumnInfo();
        initRestoreConfig();
        initPollingConfig();
        initSplitConfig();
        initIncrementConfig();
        JdbcInputFormatBuilder builder = getBuilder();

        // code ...
    

    // code...


  • 先看断点续传的初始化配置,调用initRestoreConfig()。如果存在断点续传的配置,那么会在jdbcConf中设置断点续传用的字段名、字段索引、字段类型。
package com.dtstack.chunjun.connector.jdbc.source;

// import ...

public abstract class JdbcSourceFactory extends SourceFactory 

    // code...

    protected void initRestoreConfig() 
        String name = syncConf.getRestore().getRestoreColumnName();
        if (StringUtils.isNotBlank(name)) 
            FieldConf fieldConf = FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), name);
            if (fieldConf != null) 
                jdbcConf.setRestoreColumn(name);
                jdbcConf.setRestoreColumnIndex(fieldConf.getIndex());
                jdbcConf.setRestoreColumnType(fieldConf.getType());
                restoreKeyUtil = jdbcDialect.initKeyUtil(fieldConf.getName(), fieldConf.getType());
             else 
                throw new IllegalArgumentException("unknown restore column name: " + name);
            
        
    

    // code...


  • 再看增量更新的初始化配置,调用initIncrementConfig()
    1. 如果配置了增量更新字段,那么会获取对应的字段名、字段索引、字段类型。
    2. 接着会将该“字段名、字段索引、字段类型”设置到增量更新配置、断点续传配置,也就覆盖了initRestoreConfig()中的配置
    3. initStartLocation();处还对起始位置的配置做了初始化的检查
package com.dtstack.chunjun.connector.jdbc.source;

// import ...

public abstract class JdbcSourceFactory extends SourceFactory 

    // code...
    /** 初始化增量或间隔轮询任务配置 */
    private void initIncrementConfig() 
        String increColumn = jdbcConf.getIncreColumn();

        // 增量字段不为空,表示任务为增量或间隔轮询任务
        if (StringUtils.isNotBlank(increColumn)) 
            List<FieldConf> fieldConfList = jdbcConf.getColumn();
            String type = null;
            String name = null;
            int index = -1;

            // 纯数字则表示增量字段在column中的顺序位置
            if (NumberUtils.isNumber(increColumn)) 
                int idx = Integer.parseInt(increColumn);
                if (idx > fieldConfList.size() - 1) 
                    throw new ChunJunRuntimeException(
                            String.format(
                                    "config error : incrementColumn must less than column.size() when increColumn is number, column = %s, size = %s, increColumn = %s",
                                    GsonUtil.GSON.toJson(fieldConfList),
                                    fieldConfList.size(),
                                    increColumn));
                
                FieldConf fieldColumn = fieldConfList.get(idx);
                type = fieldColumn.getType();
                name = fieldColumn.getName();
                index = fieldColumn.getIndex();
             else 
                for (FieldConf field : fieldConfList) 
                    if (Objects.equals(increColumn, field.getName())) 
                        type = field.getType();
                        name = field.getName();
                        index = field.getIndex();
                        break;
                    
                
            
            if (type == null || name == null) 
                throw new IllegalArgumentException(
                        String.format(
                                "config error : increColumn's name or type is null, column = %s, increColumn = %s",
                                GsonUtil.GSON.toJson(fieldConfList), increColumn));
            

            jdbcConf.setIncrement(true);
            jdbcConf.setIncreColumn(name);
            jdbcConf.setIncreColumnType(type);
            jdbcConf.setIncreColumnIndex(index);

            jdbcConf.setRestoreColumn(name);
            jdbcConf.setRestoreColumnType(type);
            jdbcConf.setRestoreColumnIndex(index);

            incrementKeyUtil = jdbcDialect.initKeyUtil(name, type);
            restoreKeyUtil = incrementKeyUtil;
            initStartLocation();

            if (StringUtils.isBlank(jdbcConf.getSplitPk())) 
                jdbcConf.setSplitPk(name);
                splitKeyUtil = incrementKeyUtil;
            
        
    

    // code...


  • 也就是说当配置了增量更新后,那么断点续传功能会使用增量更新字段作为断点续传字段

在读取数据时更新state(StartLocation)

  • ChunJun在创建Flink数据源时,会先调用SourceFactory中的createSource()。而createSource()会利用DtInputFormatSourceFunction来创建数据源的数据流DataStream<RowData>
  • 先看到Flink-SourceFunction的子类DtInputFormatSourceFunction
    • 当一个Flink应用运行时,读取数据源是通过SourceFunction实现的
    • SourceFunction初始化时会调用open方法
    • SourceFunction开始正常处理数据时,则会调用run方法
package com.dtstack.chunjun.source;

// import ...

public class DtInputFormatSourceFunction<OUT> extends InputFormatSourceFunction<OUT>
        implements CheckpointedFunction 

    // code ...
    
    @Override
    public void open(Configuration parameters) 
        // code ...
    

    @Override
    public void run(SourceContext<OUT> ctx) throws Exception 
        Exception tryException = null;
        try 

            Counter completedSplitsCounter =
                    getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (isRunning && format instanceof RichInputFormat) 
                ((RichInputFormat) format).openInputFormat();
            

            OUT nextElement = serializer.createInstance();
            while (isRunning) 
                format.open(splitIterator.next());

                // for each element we also check if cancel
                // was called by checking the isRunning flag

                while (isRunning && !format.reachedEnd()) 
                    synchronized (ctx.getCheckpointLock()) 
                        nextElement = format.nextRecord(nextElement);
                        if (nextElement != null) 
                            ctx.collect(nextElement);
                        
                    
                
                format.close();
                completedSplitsCounter.inc();

                if (isRunning) 
                    isRunning = splitIterator.hasNext();
                
            
         catch (Exception exception) 
            tryException = exception;
            LOG.error("Exception happened, start to close format", exception);
         finally 
            isRunning = false;
            gracefulClose();
            if (null != tryException) 
                throw tryException;
            
        
    

    // code ...


  • 处理每条数据时,调用format.nextRecord(nextElement);
package com.dtstack.chunjun.source.format;

// import ...

public abstract class BaseRichInputFormat extends RichInputFormat<RowData, InputSplit> 

    // code...

    @Override
    public RowData nextRecord(RowData rowData) 
        if (byteRateLimiter != null) 
            byteRateLimiter.acquire();
        
        RowData internalRow = null;
        try 
            internalRow = nextRecordInternal(rowData);
         catch (ReadRecordException e) 
            dirtyManager.collect(e.getRowData(), e, null);
        
        if (internalRow != null) 
            updateDuration();
            if (numReadCounter != null) 
                numReadCounter.add(1);
            
            if (bytesReadCounter != null) 
                bytesReadCounter.add(rowSizeCalculator.getObjectSize(internalRow));
            
        

        return internalRow;
    



  • 再调用connector插件的实现类(不要找错了,是插件代码里面的),例如JdbcInputFormat中的nextRecordInternal方法
package com.dtstack.chunjun.connector.jdbc.source;

// import ...

public class JdbcInputFormat extends BaseRichInputFormat 

    // code ...

    @Override
    public RowData nextRecordInternal(RowData rowData) throws ReadRecordException 
        if (!hasNext) 
            return null;
        
        try 
            @SuppressWarnings("unchecked")
            RowData finalRowData = rowConverter.toInternal(resultSet);
            if (needUpdateEndLocation) 
                BigInteger location =
                        incrementKeyUtil.getLocationValueFromRs(
                                resultSet, jdbcConf.getIncreColumnIndex() + 1);
                endLocationAccumulator.add(location);
                LOG.debug("update endLocationAccumulator, current Location = ", location);
            
            if (jdbcConf.getRestoreColumnIndex() > -1) 
                state = resultSet.getObject(jdbcConf.getRestoreColumnIndex() + 1);
            
            return finalRowData;
         catch (Exception se) 
            throw new ReadRecordException("", se, 0, rowData);
         finally 
            try 
                hasNext = resultSet.next();
             catch (SQLException e) 
                LOG.error("can not read next record", e);
                hasNext = false;
            
        
    

    // code ...


  • 此处解析了数据库返回的当前这条数据的增量更新字段的值(实际用的是RestoreColumn断点续传字段),并加1,保存到了state变量
  • 下次查询时会读取state变量,作为startLocation

间隔轮询,增量更新

  • 先看到上面DtInputFormatSourceFunction中的run方法
  • while循环时,会不断调用format.reachedEnd()方法
package com.dtstack.chunjun.connector.jdbc.source;

// import ...

public class JdbcInputFormat extends BaseRichInputFormat 

    // code ...

    @Override
    public boolean reachedEnd() 
        if (hasNext) 
            return false;
         else 
            if (currentJdbcInputSplit.isPolling()) 
                try 
                    TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());
                    // 间隔轮询检测数据库连接是否断开,超时时间三秒,断开后自动重连
                    if (!dbConn.isValid(3)) 
                        dbConn = getConnection();
                        // 重新连接后还是不可用则认为数据库异常,任务失败
                        if (!dbConn.isValid(3)) 
                            String message =
                                    String.format(
                                            "cannot connect to %s, username = %s, please check %s is available.",
                                            jdbcConf.getJdbcUrl(),
                                            jdbcConf.getUsername(),
                                            jdbcDialect.dialectName());
                            throw new ChunJunRuntimeException(message);
                        
                    
                    dbConn.setAutoCommit(true);
                    JdbcUtil.closeDbResources(resultSet, null, null, false);
                    queryForPolling(restoreKeyUtil.transToLocationValue(state).toString());
                    return false;
                 catch (InterruptedException e) 
                    LOG.warn(
                            "interrupted while waiting for polling, e = ",
                            ExceptionUtil.getErrorMessage(e));
                 catch (SQLException e) 
                    JdbcUtil.closeDbResources(resultSet, ps, null, false);
                    String message =
                            String.format(
                                    "error to execute sql = %s, startLocation = %s, e = %s",
                                    jdbcConf.getQuerySql(),
                                    state,
                                    ExceptionUtil.getErrorMessage(e));
                    throw new ChunJunRuntimeException(message, e);
                
            
            return true;
        
    

    // code ...

    protected void queryForPolling(String startLocation) throws SQLException 
        // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期
        if ((System.currentTimeMillis() - startTime) % 300000 <= jdbcConf.getPollingInterval()) 
            LOG.info("polling startLocation = ", startLocation);
         else 
            LOG.debug("polling startLocation = ", startLocation);
        

        incrementKeyUtil.setPsWithLocationStr(ps, 1, startLocation);
        resultSet = ps.executeQuery();
        hasNext = resultSet.next();
    

    // code ...


  • 代码if (currentJdbcInputSplit.isPolling()) 处,对应配置polling,看看是否要做轮询
  • 代码TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());出,对应配置pollingInterval,用作轮询间隔
  • 代码queryForPolling(restoreKeyUtil.transToLocationValue(state).toString());处,获取了state变量,作为startLocation
  • 再看queryForPolling方法,将startLocation传入到了编译好的PreparedStatement中,并执行了SQL查询

以上是关于ChunJun-JDBC轮询增量更新-源码分析的主要内容,如果未能解决你的问题,请参考以下文章

微服务架构 | *2.5 Nacos 长轮询定时机制的源码分析

微服务架构 *2.5 Nacos 长轮询定时机制的源码分析

Scrapy框架--增量式爬虫

HDFS源码分析心跳汇报之数据块增量汇报

最全的增量更新入门 包含linux端和Android

redis aof持久化源码分析