Spring Batch ItemReader 使用 PostgreSQL 函数游标错误
Posted
技术标签:
【中文标题】Spring Batch ItemReader 使用 PostgreSQL 函数游标错误【英文标题】:Spring Batch ItemReader using PostgreSQL function cursor error 【发布时间】:2015-02-24 22:28:57 【问题描述】:我需要让 Spring Batch ItemReader 使用 PostgreSQL 函数作为数据源,但我没有成功(PostgreSQL 数据库位于 Greenplum 设备中)。我创建了一个 ItemReader 示例,该示例调用了一个返回游标的 PostgreSQL 函数,但 ItemReader 失败,表明游标不存在。以下是我设置的所有组件:
-数据库表和数据:
CREATE TABLE test_user
(
test_user_sys_id numeric NOT NULL,
ssn character varying(9) NOT NULL,
create_user_id character varying(30) NOT NULL,
create_ts timestamp(6) without time zone NOT NULL DEFAULT now()
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (1,'111111111','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (2,'222222222','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (3,'333333333','DataAdmin');
-数据库功能:
CREATE or REPLACE FUNCTION get_user_func_no_arg(
p_id_min NUMERIC -- Minimum ID value
)
RETURNS REFCURSOR
AS
$BODY$
DECLARE
resultCursor REFCURSOR := 'resultCursor';
BEGIN
OPEN resultCursor FOR
SELECT test_user_sys_id, ssn
FROM test_user
WHERE test_user_sys_id > p_id_min;
RETURN resultCursor;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
-localLaunchContext.xml(环境特定信息):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<!-- Import the Spring Batch config files -->
<import resource="springBatchConfig.xml" />
<!-- Define the PostgreSQL source -->
<bean id="postgresql_dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="THE URL"/>
<property name="username" value="THE USER"/>
<property name="password" value="THE PASSWORD"/>
</bean>
<!-- Define a resourceless transaction manager for the in-memory job repository -->
<bean id="repositoryTransactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- Define a transaction manager for PostgreSQL to hopefully make the cursor work -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="postgresql_dataSource"/>
</bean>
<!-- Define in-memory job repository -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="repositoryTransactionManager"/>
</bean>
<!-- Define the synchronous job launcher -->
<bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
-springBatchConfig.xml(工作信息):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<!-- ================================================== -->
<!-- Components for TestUser Job -->
<!-- ================================================== -->
<!-- Test User Stored Procedure ItemReader -->
<bean id="testUserItemReader"
class="org.springframework.batch.item.database.StoredProcedureItemReader"
scope="step">
<property name="dataSource" ref="postgresql_dataSource" />
<property name="procedureName" value="get_user_func_no_arg" />
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="p_id_min " />
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.NUMERIC" />
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="resultCursor" />
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.OTHER" />
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="2" />
<property name="rowMapper">
<bean class="dao.mapper.TestUserRowMapper" />
</property>
<property name="preparedStatementSetter" ref="preparedStatementSetter" />
</bean>
<!-- ItemProcessor is not needed, since the stored procedure provides the results -->
<!-- TestUser ItemWriter -->
<bean id="testUserItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:output.txt" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>
<!-- TestUser Job definition -->
<batch:job id="TestUserJob" incrementer="jobParametersIncrementer">
<batch:step id="TestUser_step1">
<batch:tasklet>
<batch:transaction-attributes isolation="READ_COMMITTED" propagation="MANDATORY" timeout="200"/>
<batch:chunk reader="testUserItemReader"
writer="testUserItemWriter" commit-interval="2" />
</batch:tasklet>
</batch:step>
</batch:job>
<!-- ================================================== -->
<!-- Common Beans that are used in multiple scenarios -->
<!-- ================================================== -->
<!-- Increments the Job ID -->
<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />
<!-- Prepared statement setter to provide minimum user ID input to the PostgreSQl function. -->
<bean id="preparedStatementSetter" class="dao.setter.TestUserPreparedStatementSetter"
scope="step">
<property name="minId">
<value>#jobParameters['minId']</value>
</property>
</bean>
</beans>
-TestUser.java(ItemReader数据对象):
package domain;
// Data object to support a user
public class TestUser
private int id;
private String ssn;
public int getId()
return id;
public void setId(int id)
this.id = id;
public String getSsn()
return ssn;
public void setSsn(String ssn)
this.ssn = ssn;
@Override
public String toString()
return "TestUser [id=" + id + ", ssn=" + ssn + "]";
-TestUserPreparedStatementSetter.java(给PostgreSQL函数设置输入参数):
package dao.setter;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.springframework.jdbc.core.PreparedStatementSetter;
// Spring Batch PreparedStatementSetter to set parameters for stored procedures
public class TestUserPreparedStatementSetter implements PreparedStatementSetter
private int minId;
// Setters
public void setMinId(int minId)
this.minId = minId;
// Set the parameters for the stored procedure
public void setValues(PreparedStatement ps) throws SQLException
CallableStatement cs = (CallableStatement) ps;
cs.setInt(1, this.minId);
cs.registerOutParameter(2, java.sql.Types.OTHER);
-TestUserRowMapper.java(将 ItemReader 游标行映射到 TestUser 数据对象):
package dao.mapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import domain.TestUser;
//Spring RowMapper to support stored procedure results
public class TestUserRowMapper implements RowMapper<TestUser>
// Map the results to the DAO
public TestUser mapRow(ResultSet resultSet, int rowNum) throws SQLException
TestUser resultData = new TestUser();
resultData.setId(resultSet.getInt(1));
resultData.setSsn(resultSet.getString(2));
return resultData;
-调用命令:
java -classpath ".;lib\*;bin" org.springframework.batch.core.launch.support.CommandLineJobRunner localLaunchContext.xml TestUserJob minId=1
当我运行它时,我收到以下错误:
Caused by: org.postgresql.util.PSQLException: ERROR: cursor "resultCursor" does not exist
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2198)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1927)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255)
at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:561)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:405)
at org.postgresql.jdbc2.AbstractJdbc2Connection.execSQLQuery(AbstractJdbc2Connection.java:363)
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.internalGetObject(AbstractJdbc2ResultSet.java:207)
at org.postgresql.jdbc3.AbstractJdbc3ResultSet.internalGetObject(AbstractJdbc3ResultSet.java:36)
at org.postgresql.jdbc4.AbstractJdbc4ResultSet.internalGetObject(AbstractJdbc4ResultSet.java:300)
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getObject(AbstractJdbc2ResultSet.java:2704)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:456)
at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:412)
at org.springframework.batch.item.database.StoredProcedureItemReader.openCursor(StoredProcedureItemReader.java:205)
... 29 more
我反复尝试将 REFCURSOR 指定为函数的输出参数和许多其他事情,但没有成功。这是使用 JDK 1.7、PostgreSQL JDBC 驱动程序 JAR postgresql-9.3-1102.jdbc4 和 PostgreSQL 8.2.15(在 Greenplum 4.2.8.1 build 2 下)发生的。
欢迎提出任何建议。提前致谢。
【问题讨论】:
【参考方案1】:经过大量调查,我找到了解决方法-将数据源更改为支持禁用自动提交的数据源:
<!-- Define the PostgreSQL source -->
<bean id="postgresql_dataSource"
class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="THE URL"/>
<property name="username" value="THE USER"/>
<property name="password" value="THE PASSWORD"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
【讨论】:
以上是关于Spring Batch ItemReader 使用 PostgreSQL 函数游标错误的主要内容,如果未能解决你的问题,请参考以下文章
Spring-batch学习总结—ItemReader普通文件,数据库,XML,多文件数据读取
Spring Batch:用于高容量和低延迟的 ItemReader 实现
如何在 Spring Batch 中使用 Resource ItemReader