spring batch ftp 集成超时错误 - 使用 spring-boot/spring-batch 进行 ETL

Posted

技术标签:

【中文标题】spring batch ftp 集成超时错误 - 使用 spring-boot/spring-batch 进行 ETL【英文标题】:spring batch ftp integration timeout error - use spring-boot/spring-batch for ETL 【发布时间】:2020-08-12 23:16:45 【问题描述】:

我是 Spring 新手,正在寻找用于简单 ETL 过程的 Spring 批处理解决方案。 我点击了以下链接:https://examples.javacodegeeks.com/enterprise-java/spring/batch/spring-batch-etl-job-example/ 学习。

效果很好,我想到了使用 Spring Batch 进行 ETL 的基本结构。

接下来,我想从 FTP 服务器获取文件作为 PoC。基本上,我不想使用硬编码的输入文件,而是想添加从 FTP 获取。

通过搜索,我了解到我们可以将 spring 集成用于 FTP/SFTP。

我点击以下链接:https://coreyreil.wordpress.com/2012/12/21/spring-batch-creating-an-ftp-tasklet-to-get-remote-files/ 创建了一个 tasklet,它将执行 FTP/SFTP 获取。

在现有的 BatchConfiguration.java 中,我添加了以下内容以将我的 FTP 任务添加为第一步。

@Bean
public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()

    FtpGetRemoteFilesTasklet  ftpTasklet = new FtpGetRemoteFilesTasklet();
    ftpTasklet.setRetryIfNotFound(true);
    ftpTasklet.setDownloadFileAttempts(3);
    ftpTasklet.setRetryIntervalMilliseconds(10000);
    ftpTasklet.setFileNamePattern("README");
    //ftpTasklet.setFileNamePattern("TestFile");
    ftpTasklet.setRemoteDirectory("/");
    ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
    ftpTasklet.setSessionFactory(myFtpSessionFactory);

    return ftpTasklet;


@Bean
public SessionFactory myFtpSessionFactory()

    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost("ftp.gnu.org");
    ftpSessionFactory.setClientMode(0);
    ftpSessionFactory.setFileType(0);
    ftpSessionFactory.setPort(21);
    ftpSessionFactory.setUsername("anonymous");
    ftpSessionFactory.setPassword("anonymous");

    return ftpSessionFactory;

然后被jobbuilder改成如下:

// Configure job step
@Bean
public Job marketPricesETLJob() 
    return jobBuilderFactory.get("Market Prices ETL Job")
            .incrementer(new RunIdIncrementer())
            .listener(listener())
            //.flow(etlStep()).end().build();
            .start(getFilesFromFTPServer()).on("FAILED").end()
            .from(getFilesFromFTPServer()).on("COMPLETED").to(etlStep())
            .end()
            .build();


@Bean
public Step getFilesFromFTPServer() 
    return stepBuilderFactory.get("Get File from FTP Server")
            .tasklet(myFtpGetRemoteFilesTasklet())
            .build();


@Bean
public Step etlStep() 
    return stepBuilderFactory.get("Extract -> Transform -> Aggregate -> Load").<MarketEvent, Trade> chunk(10000)
            .reader(marketEventReader())
            .processor(marketEventProcessor())
            .writer(stockPriceAggregator())
            .build();

例如,基本上使用站点 ftp.gnu.org 来读取 README 文件。

但我收到以下错误。

2020-04-28 19:10:19.269  INFO 20380 --- [           main] c.s.demo.SpringBatchDemoApplication      : Started SpringBatchDemoApplication in 16.846 seconds (JVM running for 18.903)
2020-04-28 19:10:19.274  INFO 20380 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
2020-04-28 19:10:19.748  INFO 20380 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=Market Prices ETL Job]] launched with the following parameters: [run.id=8]
2020-04-28 19:10:19.884  INFO 20380 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Get File from FTP Server]
2020-04-28 19:11:40.046  WARN 20380 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=49s792ms777µs228ns).
2020-04-28 19:12:13.603 ERROR 20380 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step Get File from FTP Server in job Market Prices ETL Job

org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'null' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:355) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:333) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at com.springbatch.demo.tasklet.FtpGetRemoteFilesTasklet.execute(FtpGetRemoteFilesTasklet.java:112) ~[classes/:na]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at com.sun.proxy.$Proxy62.run(Unknown Source) ~[na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at com.springbatch.demo.SpringBatchDemoApplication.main(SpringBatchDemoApplication.java:10) ~[classes/:na]
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:446) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:348) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    ... 43 common frames omitted
Caused by: java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    ... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect
    at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
    at java.base/sun.nio.ch.Net.connect(Net.java:493) ~[na:na]
    at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
    at java.base/sun.nio.ch.NiosocketImpl.connect(NioSocketImpl.java:588) ~[na:na]
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:339) ~[na:na]
    at java.base/java.net.Socket.connect(Socket.java:603) ~[na:na]
    at org.apache.commons.net.SocketClient._connect(SocketClient.java:243) ~[commons-net-3.6.jar:3.6]
    at org.apache.commons.net.SocketClient.connect(SocketClient.java:202) ~[commons-net-3.6.jar:3.6]
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:193) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]

主要错误是:

Caused by: java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    ... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect

我可以从我的机器连接到网站。同时下载文件。 如何检查问题是什么?我该如何解决这个问题?

基本上,我正在寻找 ETL 流程的好例子,这将涉及以下基本任务: 1. 从远程主机(FTP/SFTP)获取文件到本地主机。 2. 读取/解析下载的文件。 3. 处理文件(一些业务逻辑)。可以读取多个此类下载的文件,将它们组合起来(例如使用多个数据帧) 4. 编写器将处理后的结果存储在数据库(大部分)/任何其他来源中。

对于以上内容,我将通过一些示例来了解 Spring Batch 如何与一些工作示例一起工作。

----根据以下建议更新----

是的,基于https://docs.spring.io/spring-integration/api/org/springframework/integration/ftp/session/AbstractFtpSessionFactory.html#setClientMode-int-;我尝试了 0 和 2。

我检查了 ftpSessionFactory,根据我的设置看起来不错。

【问题讨论】:

确保您的 FTP 会话工厂配置正确。您是否在ftpSessionFactory.setClientMode(0); 中尝试过其他客户端模式? 谢谢,我用上面的方法更新了我的问题,我尝试并检查了。 【参考方案1】:

了解以下信息可能有助于查明问题。

    isSftp()方法的代码(在your link中提到) 在您的列表中声明字段 myFtpSessionFactory

您是否尝试通过以下任一步骤来缩小问题范围?无论如何,这些都是在黑暗中拍摄的。

    通过设置以下主机名来引发它抛出java.net.UnknownHostException

    ftpSessionFactory.setHost("invalid")
    

    如果没有发生,我怀疑SessionFactory 的错误实例被注入

    同样,通过设置以下主机名和端口来引发它抛出Connection refused

    ftpSessionFactory.setHost("localhost") 
    ftpSessionFactory.setPort(42)
    

    如果我们设置不同的超时时间会发生什么?

    ftpSessionFactory.setConnectTimeout(60000);
    

    在AbstractFtpSessionFactory.java:193这一行设置断点

    一个。检查client.connectionTimeout 的值。它应该是,比如说,至少 60000ms 或 0

    b.检查hostport的值

    运行一段简化的等效代码。它仍然重现错误吗?

    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost("ftp.gnu.org");
    ftpSessionFactory.setClientMode(0);
    ftpSessionFactory.setFileType(0);
    ftpSessionFactory.setPort(21);
    ftpSessionFactory.setUsername("anonymous");
    ftpSessionFactory.setPassword("anonymous");
    
    AbstractInboundFileSynchronizer<?> synchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory);
    File localDirectory = null;
    synchronizer.synchronizeToLocalDirectory(localDirectory);
    

【讨论】:

以上是关于spring batch ftp 集成超时错误 - 使用 spring-boot/spring-batch 进行 ETL的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch - Spring 集成

在 Spring-boot 上将 Spring Batch 与 spring-batch-admin-manager 集成时出错

Spring 集成调用 Spring Batch

将 Spring Batch Admin 与另一个 JPA 持久性集成到 Spring Boot Web 应用程序中

如何将自定义方法添加到 spring 集成 ftp 网关接口?

spring 集成中如何动态创建 ftp 适配器?