Sink 组件在 Spring Cloud 数据流中无法使用 kafka 获取正确的数据
Posted
技术标签:
【中文标题】Sink 组件在 Spring Cloud 数据流中无法使用 kafka 获取正确的数据【英文标题】:Sink component doesn't get the right data with kafka in spring cloud data flow 【发布时间】:2017-01-13 16:59:34 【问题描述】:我不是以英语为母语的人,但我会尽量清楚地表达我的问题。 遇到这个问题困扰了我两天,还是找不到解决办法。
我已经构建了一个流,它将在 Hadoop YARN 中的 Spring Can 数据流中运行。
流由Http source、processor和file sink组成。
1.Http来源 HTTP Source 组件有两个输出通道绑定两个不同的目的地,即在 application.properties 中定义的 dest1 和 dest2。
spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2
以下是 HTTP 源代码片段供您参考..
@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = "text/*", "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType)
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = "text/*", "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType)
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
2。处理器 处理器有两个多输入通道和两个输出通道,它们与不同的目的地绑定。 目标绑定在处理器组件项目的 application.properties 中定义。
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
下面是处理器的代码 sn-p。
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message)
logger.info("enter ...transform...");
return "processed by transform1";;
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message)
logger.info("enter ... transform2...");
return "processed by transform2";
3.文件接收器组件。
我使用 Spring 的官方 fil sink 组件。 maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
我只是在其 applicaiton.properties 文件中添加目标绑定。 spring.cloud.stream.bindings.input.destination=fileSink
4.发现:
我期望的数据流应该是这样的:
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
是否应该只将字符串“由 transform2 处理”保存到文件中。
但是经过我的测试,数据流实际是这样的:
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
“由transform1处理”和“由transform2处理”字符串都保存到文件中。
5.问题:
虽然 Processor.handleRequest() 中输出通道的目的地绑定到 hdfsSink 而不是 fileSink,但数据仍流向文件 Sink。我无法理解这一点,这不是我想要的。 我只希望来自 Processor.handleRequest2() 的数据流到文件接收器而不是两者。 如果我做得不对,谁能告诉我该怎么做以及解决方案是什么? 我已经困惑了2天了。
感谢您的热心帮助。
亚历克斯
【问题讨论】:
【参考方案1】:您的流定义是这样的吗(“-2”版本是具有多个通道的版本)?
http-source-2 | processor-2 | file-sink
请注意,Spring Cloud Data Flow 将覆盖applications.properties
中定义的目标,这就是为什么即使处理器的spring.cloud.stream.bindings.output.destination
设置为hdfs-sink
,它实际上也会匹配file-sink
的输入。
此处解释了从流定义配置目标的方式(在水龙头的上下文中):http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-tap-dsl
您可以做的是简单地交换通道 1 和 2 的含义 - 将侧通道用于 hdfs。虽然这有点脆弱 - 因为 Stream 的 input
/output
通道将自动配置,而其他通道将通过 application.properties
配置 - 在这种情况下,通过配置侧通道目标可能会更好流定义或部署时 - 请参阅http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties。
在我看来,这些也可能是 2 个使用常规组件监听不同端点的流 - 假设数据应该并排流动。
【讨论】:
您好,Marius,感谢您的回答。现在可以了。太感谢了。真的很有帮助。 对了,今天下午我在 YARN 上进行测试时,发现 YARN 上的 SCDF 似乎不仅缓存了 /dataflow/artifacts/cache 文件夹中的应用程序,而且还缓存了其他地方,例如数据库。我不确定它是否也将应用程序缓存在数据库中。即使我销毁流并取消注册应用程序并删除 /dataflow/artifacts/cache 文件夹中的所有文件,我发现 Yarn 上的 SCDF 仍然使用缓存的应用程序进行部署。最后,我别无选择,只能重新启动 SCDF 服务器,它就可以工作了。您知道如何在不重新启动服务器的情况下完全清除缓存数据吗?谢谢, 这个section in the docs 可能很有用。缓存在/dataflow/artifacts/cache
和hdfs
目录中完成,因此每当您清除缓存时,您可能也必须重新启动服务器。请随时分享您对此工作流程的反馈和/或改进。
感谢萨比的反馈。除了做Caching的/dataflow/artifacts/cache目录,你知道hdfs目录在哪里吗?我不想重新启动服务器以清除缓存,因为如果这样做,即使我们只在一个流中更新一个应用程序,也必须再次手动重新部署所有流。
您是否尝试过使用--force
选项注册“一个应用程序”?这应该覆盖现有的并且将使用最新的。此外,该主题的原始主题已经解决,这完全是一个不同的讨论 - 如果您仍然遇到问题,也许您可以考虑关闭此主题并打开一个新主题。以上是关于Sink 组件在 Spring Cloud 数据流中无法使用 kafka 获取正确的数据的主要内容,如果未能解决你的问题,请参考以下文章
和spring cloud/boot 学习如何管理自己的组件
Spring Cloud Stream 消息驱动 RabbitMQ 基础使用