ChunJun-JDBC轮询增量更新-源码分析
Posted 蒋含竹
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ChunJun-JDBC轮询增量更新-源码分析相关的知识,希望对你有一定的参考价值。
ChunJun-JDBC轮询增量更新-源码分析
- 版本 ChunJun 1.12
断点续传字段与增量更新字段
- 在“ChunJun任务提交-源码分析”中,我们可以看到:
- 在执行Flink应用的main方法时,会先构建Flink的数据源
DataStream<RowData>
- 构建数据源,则是调用了
JdbcSourceFactory
中的createSource()
方法
- 在执行Flink应用的main方法时,会先构建Flink的数据源
package com.dtstack.chunjun.connector.jdbc.source;
// import ...
public abstract class JdbcSourceFactory extends SourceFactory
// code...
@Override
public DataStream<RowData> createSource()
initColumnInfo();
initRestoreConfig();
initPollingConfig();
initSplitConfig();
initIncrementConfig();
JdbcInputFormatBuilder builder = getBuilder();
// code ...
// code...
- 先看断点续传的初始化配置,调用
initRestoreConfig()
。如果存在断点续传的配置,那么会在jdbcConf
中设置断点续传用的字段名、字段索引、字段类型。
package com.dtstack.chunjun.connector.jdbc.source;
// import ...
public abstract class JdbcSourceFactory extends SourceFactory
// code...
protected void initRestoreConfig()
String name = syncConf.getRestore().getRestoreColumnName();
if (StringUtils.isNotBlank(name))
FieldConf fieldConf = FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), name);
if (fieldConf != null)
jdbcConf.setRestoreColumn(name);
jdbcConf.setRestoreColumnIndex(fieldConf.getIndex());
jdbcConf.setRestoreColumnType(fieldConf.getType());
restoreKeyUtil = jdbcDialect.initKeyUtil(fieldConf.getName(), fieldConf.getType());
else
throw new IllegalArgumentException("unknown restore column name: " + name);
// code...
- 再看增量更新的初始化配置,调用
initIncrementConfig()
- 如果配置了增量更新字段,那么会获取对应的字段名、字段索引、字段类型。
- 接着会将该“字段名、字段索引、字段类型”设置到增量更新配置、断点续传配置,也就覆盖了
initRestoreConfig()
中的配置 initStartLocation();
处还对起始位置的配置做了初始化的检查
package com.dtstack.chunjun.connector.jdbc.source;
// import ...
public abstract class JdbcSourceFactory extends SourceFactory
// code...
/** 初始化增量或间隔轮询任务配置 */
private void initIncrementConfig()
String increColumn = jdbcConf.getIncreColumn();
// 增量字段不为空,表示任务为增量或间隔轮询任务
if (StringUtils.isNotBlank(increColumn))
List<FieldConf> fieldConfList = jdbcConf.getColumn();
String type = null;
String name = null;
int index = -1;
// 纯数字则表示增量字段在column中的顺序位置
if (NumberUtils.isNumber(increColumn))
int idx = Integer.parseInt(increColumn);
if (idx > fieldConfList.size() - 1)
throw new ChunJunRuntimeException(
String.format(
"config error : incrementColumn must less than column.size() when increColumn is number, column = %s, size = %s, increColumn = %s",
GsonUtil.GSON.toJson(fieldConfList),
fieldConfList.size(),
increColumn));
FieldConf fieldColumn = fieldConfList.get(idx);
type = fieldColumn.getType();
name = fieldColumn.getName();
index = fieldColumn.getIndex();
else
for (FieldConf field : fieldConfList)
if (Objects.equals(increColumn, field.getName()))
type = field.getType();
name = field.getName();
index = field.getIndex();
break;
if (type == null || name == null)
throw new IllegalArgumentException(
String.format(
"config error : increColumn's name or type is null, column = %s, increColumn = %s",
GsonUtil.GSON.toJson(fieldConfList), increColumn));
jdbcConf.setIncrement(true);
jdbcConf.setIncreColumn(name);
jdbcConf.setIncreColumnType(type);
jdbcConf.setIncreColumnIndex(index);
jdbcConf.setRestoreColumn(name);
jdbcConf.setRestoreColumnType(type);
jdbcConf.setRestoreColumnIndex(index);
incrementKeyUtil = jdbcDialect.initKeyUtil(name, type);
restoreKeyUtil = incrementKeyUtil;
initStartLocation();
if (StringUtils.isBlank(jdbcConf.getSplitPk()))
jdbcConf.setSplitPk(name);
splitKeyUtil = incrementKeyUtil;
// code...
- 也就是说当配置了增量更新后,那么断点续传功能会使用增量更新字段作为断点续传字段
在读取数据时更新state(StartLocation)
- ChunJun在创建Flink数据源时,会先调用
SourceFactory
中的createSource()
。而createSource()
会利用DtInputFormatSourceFunction
来创建数据源的数据流DataStream<RowData>
。 - 先看到Flink-
SourceFunction
的子类DtInputFormatSourceFunction
- 当一个Flink应用运行时,读取数据源是通过
SourceFunction
实现的 SourceFunction
初始化时会调用open
方法SourceFunction
开始正常处理数据时,则会调用run
方法
- 当一个Flink应用运行时,读取数据源是通过
package com.dtstack.chunjun.source;
// import ...
public class DtInputFormatSourceFunction<OUT> extends InputFormatSourceFunction<OUT>
implements CheckpointedFunction
// code ...
@Override
public void open(Configuration parameters)
// code ...
@Override
public void run(SourceContext<OUT> ctx) throws Exception
Exception tryException = null;
try
Counter completedSplitsCounter =
getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
if (isRunning && format instanceof RichInputFormat)
((RichInputFormat) format).openInputFormat();
OUT nextElement = serializer.createInstance();
while (isRunning)
format.open(splitIterator.next());
// for each element we also check if cancel
// was called by checking the isRunning flag
while (isRunning && !format.reachedEnd())
synchronized (ctx.getCheckpointLock())
nextElement = format.nextRecord(nextElement);
if (nextElement != null)
ctx.collect(nextElement);
format.close();
completedSplitsCounter.inc();
if (isRunning)
isRunning = splitIterator.hasNext();
catch (Exception exception)
tryException = exception;
LOG.error("Exception happened, start to close format", exception);
finally
isRunning = false;
gracefulClose();
if (null != tryException)
throw tryException;
// code ...
- 处理每条数据时,调用
format.nextRecord(nextElement);
package com.dtstack.chunjun.source.format;
// import ...
public abstract class BaseRichInputFormat extends RichInputFormat<RowData, InputSplit>
// code...
@Override
public RowData nextRecord(RowData rowData)
if (byteRateLimiter != null)
byteRateLimiter.acquire();
RowData internalRow = null;
try
internalRow = nextRecordInternal(rowData);
catch (ReadRecordException e)
dirtyManager.collect(e.getRowData(), e, null);
if (internalRow != null)
updateDuration();
if (numReadCounter != null)
numReadCounter.add(1);
if (bytesReadCounter != null)
bytesReadCounter.add(rowSizeCalculator.getObjectSize(internalRow));
return internalRow;
- 再调用connector插件的实现类(不要找错了,是插件代码里面的),例如
JdbcInputFormat
中的nextRecordInternal
方法
package com.dtstack.chunjun.connector.jdbc.source;
// import ...
public class JdbcInputFormat extends BaseRichInputFormat
// code ...
@Override
public RowData nextRecordInternal(RowData rowData) throws ReadRecordException
if (!hasNext)
return null;
try
@SuppressWarnings("unchecked")
RowData finalRowData = rowConverter.toInternal(resultSet);
if (needUpdateEndLocation)
BigInteger location =
incrementKeyUtil.getLocationValueFromRs(
resultSet, jdbcConf.getIncreColumnIndex() + 1);
endLocationAccumulator.add(location);
LOG.debug("update endLocationAccumulator, current Location = ", location);
if (jdbcConf.getRestoreColumnIndex() > -1)
state = resultSet.getObject(jdbcConf.getRestoreColumnIndex() + 1);
return finalRowData;
catch (Exception se)
throw new ReadRecordException("", se, 0, rowData);
finally
try
hasNext = resultSet.next();
catch (SQLException e)
LOG.error("can not read next record", e);
hasNext = false;
// code ...
- 此处解析了数据库返回的当前这条数据的增量更新字段的值(实际用的是RestoreColumn断点续传字段),并加1,保存到了
state
变量 - 下次查询时会读取
state
变量,作为startLocation
间隔轮询,增量更新
- 先看到上面DtInputFormatSourceFunction中的
run
方法 - while循环时,会不断调用
format.reachedEnd()
方法
package com.dtstack.chunjun.connector.jdbc.source;
// import ...
public class JdbcInputFormat extends BaseRichInputFormat
// code ...
@Override
public boolean reachedEnd()
if (hasNext)
return false;
else
if (currentJdbcInputSplit.isPolling())
try
TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());
// 间隔轮询检测数据库连接是否断开,超时时间三秒,断开后自动重连
if (!dbConn.isValid(3))
dbConn = getConnection();
// 重新连接后还是不可用则认为数据库异常,任务失败
if (!dbConn.isValid(3))
String message =
String.format(
"cannot connect to %s, username = %s, please check %s is available.",
jdbcConf.getJdbcUrl(),
jdbcConf.getUsername(),
jdbcDialect.dialectName());
throw new ChunJunRuntimeException(message);
dbConn.setAutoCommit(true);
JdbcUtil.closeDbResources(resultSet, null, null, false);
queryForPolling(restoreKeyUtil.transToLocationValue(state).toString());
return false;
catch (InterruptedException e)
LOG.warn(
"interrupted while waiting for polling, e = ",
ExceptionUtil.getErrorMessage(e));
catch (SQLException e)
JdbcUtil.closeDbResources(resultSet, ps, null, false);
String message =
String.format(
"error to execute sql = %s, startLocation = %s, e = %s",
jdbcConf.getQuerySql(),
state,
ExceptionUtil.getErrorMessage(e));
throw new ChunJunRuntimeException(message, e);
return true;
// code ...
protected void queryForPolling(String startLocation) throws SQLException
// 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期
if ((System.currentTimeMillis() - startTime) % 300000 <= jdbcConf.getPollingInterval())
LOG.info("polling startLocation = ", startLocation);
else
LOG.debug("polling startLocation = ", startLocation);
incrementKeyUtil.setPsWithLocationStr(ps, 1, startLocation);
resultSet = ps.executeQuery();
hasNext = resultSet.next();
// code ...
- 代码
if (currentJdbcInputSplit.isPolling())
处,对应配置polling
,看看是否要做轮询 - 代码
TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());
出,对应配置pollingInterval
,用作轮询间隔 - 代码
queryForPolling(restoreKeyUtil.transToLocationValue(state).toString());
处,获取了state
变量,作为startLocation - 再看
queryForPolling
方法,将startLocation
传入到了编译好的PreparedStatement中,并执行了SQL查询
以上是关于ChunJun-JDBC轮询增量更新-源码分析的主要内容,如果未能解决你的问题,请参考以下文章