使用 Redshift 作为 Spring 批处理作业存储库和 Redshift 中 SEQUENCE 的替代品
Posted
技术标签:
【中文标题】使用 Redshift 作为 Spring 批处理作业存储库和 Redshift 中 SEQUENCE 的替代品【英文标题】:Using Redshfit as Spring batch Job Repository and alternatives to SEQUENCE in Redshfit 【发布时间】:2013-12-26 18:56:07 【问题描述】:我的项目中的一个要求是将 Spring Batch 模式放置在 amazon redshift db 上。 我打算从 schema-postgresql.sql 作为基线开始,因为 redshift 是基于 postgres 的。
查看 spring 批处理源代码,您似乎需要做一些事情来完成这项工作:
扩展 JobRepositoryFactoryBean、DefaultDataFieldMaxValueIncrementerFactory。 添加我自己的扩展 AbstractSequenceMaxValueIncrementer 的 RedshfitMaxValueIncrementer查看redshift datatypes,除了用于创建作业、执行、步骤执行 ID 的序列之外,转换模式脚本似乎没有任何问题。
您对缺失序列的最佳解决方法有什么建议?
-
将这些列指定为IDENTITY 列。
从红移的角度来看,这是最简单的方法。这可能是有问题的
DataFieldMaxValueIncrementer.nextLongValue() 返回 long 而不是
很长,我们需要返回 null 并让 IDENTITY 为我们完成这项工作
基于类似 select max(STEP_EXECUTION_ID) from BATCH_STEP_EXECUTION 的实现
并做一些类似于 mysqlMaxValueIncrementer 的扩展
AbstractColumnMaxValueIncrementer
仅在 java 代码中创建序列;使用类似的工具
休眠使用
上面没有提到的一种方法
【问题讨论】:
有人吗?有什么想法吗? 你解决过这个问题吗? 在我有机会完成此操作之前,我们切换到了不同的数据库。 【参考方案1】:这就是我如何至少让那部分(显然)工作:
在DefaultBatchConfigurer
的子类中,我添加了以下代码:
@Override
protected JobRepository createJobRepository() throws Exception
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(getTransactionManager());
factory.setIncrementerFactory(new RedshiftIncrementerFactory(dataSource));
factory.afterPropertiesSet();
return factory.getObject();
工厂对象的样子
public class RedshiftIncrementerFactory implements DataFieldMaxValueIncrementerFactory
private DataSource dataSource;
public RedshiftIncrementerFactory(DataSource ds)
this.dataSource = ds;
@Override
public DataFieldMaxValueIncrementer getIncrementer(String databaseType, String incrementerName)
return new RedshiftIncrementer(dataSource, incrementerName);
@Override
public boolean isSupportedIncrementerType(String databaseType)
return POSTGRES.toString().equals(databaseType);
@Override
public String[] getSupportedIncrementerTypes()
return new String[]POSTGRES.toString();
然后,最后是增量器本身:
public class RedshiftIncrementer extends AbstractSequenceMaxValueIncrementer
public RedshiftIncrementer(DataSource dataSource, String incrementorName)
super(dataSource, incrementorName);
// I need to run two queries here, since Redshift doesn't support sequences
@Override
protected long getNextKey() throws DataAccessException
Connection con = DataSourceUtils.getConnection(getDataSource());
Statement stmt = null;
ResultSet rs = null;
try
stmt = con.createStatement();
DataSourceUtils.applyTransactionTimeout(stmt, getDataSource());
String table = getIncrementerName();
stmt.executeUpdate("UPDATE " + table + " SET ID = ID + 1");
rs = stmt.executeQuery("SELECT ID FROM " + table + " WHERE UNIQUE_KEY='0'");
if (rs.next())
return rs.getLong(1);
else
throw new DataAccessResourceFailureException("Sequence query did not return a result");
catch (SQLException ex)
throw new DataAccessResourceFailureException("Could not obtain sequence value", ex);
finally
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(stmt);
DataSourceUtils.releaseConnection(con, getDataSource());
@Override
protected String getSequenceQuery()
// No longer used
return null;
这至少允许作业开始。但是,Redshift 还存在其他问题,我将在其他地方详细说明。
【讨论】:
以上是关于使用 Redshift 作为 Spring 批处理作业存储库和 Redshift 中 SEQUENCE 的替代品的主要内容,如果未能解决你的问题,请参考以下文章
将列表/数组作为参数/返回类型传递并返回给 Redshift 中的 UDF
如何使用 RedShift 查询的输出作为 EMR 作业的输入?