使用 Redshift 作为 Spring 批处理作业存储库和 Redshift 中 SEQUENCE 的替代品

Posted

技术标签:

【中文标题】使用 Redshift 作为 Spring 批处理作业存储库和 Redshift 中 SEQUENCE 的替代品【英文标题】:Using Redshfit as Spring batch Job Repository and alternatives to SEQUENCE in Redshfit 【发布时间】:2013-12-26 18:56:07 【问题描述】:

我的项目中的一个要求是将 Spring Batch 模式放置在 amazon redshift db 上。 我打算从 schema-postgresql.sql 作为基线开始,因为 redshift 是基于 postgres 的。

查看 spring 批处理源代码,您似乎需要做一些事情来完成这项工作:

扩展 JobRepositoryFactoryBean、DefaultDataFieldMaxValueIncrementerFactory。 添加我自己的扩展 AbstractSequenceMaxValueIncrementer 的 RedshfitMaxValueIncrementer

查看redshift datatypes,除了用于创建作业、执行、步骤执行 ID 的序列之外,转换模式脚本似乎没有任何问题。

您对缺失序列的最佳解决方法有什么建议?

    将这些列指定为IDENTITY 列。 从红移的角度来看,这是最简单的方法。这可能是有问题的 DataFieldMaxValueIncrementer.nextLongValue() 返回 long 而不是 很长,我们需要返回 null 并让 IDENTITY 为我们完成这项工作 基于类似 select max(STEP_EXECUTION_ID) from BATCH_STEP_EXECUTION 的实现 并做一些类似于 mysqlMaxValueIncrementer 的扩展 AbstractColumnMaxValueIncrementer 仅在 java 代码中创建序列;使用类似的工具 休眠使用 上面没有提到的一种方法

【问题讨论】:

有人吗?有什么想法吗? 你解决过这个问题吗? 在我有机会完成此操作之前,我们切换到了不同的数据库。 【参考方案1】:

这就是我如何至少让那部分(显然)工作:

DefaultBatchConfigurer 的子类中,我添加了以下代码:

@Override
protected JobRepository createJobRepository() throws Exception

    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(getTransactionManager());
    factory.setIncrementerFactory(new RedshiftIncrementerFactory(dataSource));
    factory.afterPropertiesSet();
    return factory.getObject();

工厂对象的样子

public class RedshiftIncrementerFactory implements DataFieldMaxValueIncrementerFactory

    private DataSource dataSource;

    public RedshiftIncrementerFactory(DataSource ds)
    
        this.dataSource = ds;
    

    @Override
    public DataFieldMaxValueIncrementer getIncrementer(String databaseType, String incrementerName)
    
        return new RedshiftIncrementer(dataSource, incrementerName);
    

    @Override
    public boolean isSupportedIncrementerType(String databaseType)
    
        return POSTGRES.toString().equals(databaseType);
    

    @Override
    public String[] getSupportedIncrementerTypes()
    
        return new String[]POSTGRES.toString();
    


然后,最后是增量器本身:

public class RedshiftIncrementer extends AbstractSequenceMaxValueIncrementer

    public RedshiftIncrementer(DataSource dataSource, String incrementorName)
    
        super(dataSource, incrementorName);
    

    // I need to run two queries here, since Redshift doesn't support sequences
    @Override
    protected long getNextKey() throws DataAccessException 
        Connection con = DataSourceUtils.getConnection(getDataSource());
        Statement stmt = null;
        ResultSet rs = null;
        try 
            stmt = con.createStatement();
            DataSourceUtils.applyTransactionTimeout(stmt, getDataSource());
            String table = getIncrementerName();
            stmt.executeUpdate("UPDATE " + table + " SET ID = ID + 1");
            rs = stmt.executeQuery("SELECT ID FROM " + table + " WHERE UNIQUE_KEY='0'");
            if (rs.next()) 
                return rs.getLong(1);
            
            else 
                throw new DataAccessResourceFailureException("Sequence query did not return a result");
            
        
        catch (SQLException ex) 
            throw new DataAccessResourceFailureException("Could not obtain sequence value", ex);
        
        finally 
            JdbcUtils.closeResultSet(rs);
            JdbcUtils.closeStatement(stmt);
            DataSourceUtils.releaseConnection(con, getDataSource());
        
    

    @Override
    protected String getSequenceQuery()
    
        // No longer used
        return null;
    

这至少允许作业开始。但是,Redshift 还存在其他问题,我将在其他地方详细说明。

【讨论】:

以上是关于使用 Redshift 作为 Spring 批处理作业存储库和 Redshift 中 SEQUENCE 的替代品的主要内容,如果未能解决你的问题,请参考以下文章

将列表/数组作为参数/返回类型传递并返回给 Redshift 中的 UDF

使用 RedShift 作为附加的 Django 数据库

AWS DMS Redshift 作为目标

如何使用 RedShift 查询的输出作为 EMR 作业的输入?

Redshift - 如何使用一个表中的列作为 SIMILAR TO 中的模式

在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源