使用 Apache Camel Source 从 S3 到 Kafka

Posted

技术标签:

【中文标题】使用 Apache Camel Source 从 S3 到 Kafka【英文标题】:From S3 to Kafka using Apache Camel Source 【发布时间】:2021-10-25 14:06:25 【问题描述】:

我想将 amazon-s3 中的数据读入 kafka。我找到了 camel-aws-s3-kafka-connector 源,我尝试使用它并且它可以工作,但是......我想从 s3 读取数据而不删除文件,但对每个消费者执行一次,没有重复。仅使用配置文件可以做到这一点吗?我已经创建了如下所示的文件:

name=CamelSourceConnector
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter

camel.source.maxPollDuration=10000

topics=ReadTopic
#prefix=WriteTopic
camel.source.endpoint.prefix=full/path/to/WriteTopic2


camel.source.path.bucketNameOrArn=BucketName
camel.source.endpoint.autocloseBody=false

camel.source.endpoint.deleteAfterRead=false

camel.sink.endpoint.region=xxxx
camel.component.aws-s3.accessKey=xxxx
camel.component.aws-s3.secretKey=xxxx

除了上面的配置,我不能只从“WriteTopic”中读取,而是从 s3 中的所有文件夹中读取,是否也可以配置? S3Bucket folders with files

【问题讨论】:

S3 没有“文件夹”。 WriteTopic/ 的 S3 前缀将排除 WriteTopic2/ 数据 我的例子很糟糕,即使我输入了前缀 WriteTopic2 它也会从 WriteTopic 和 WriteTopic2 中读取。 当我使用 camel.source.endpoint.prefix=full/path/to/WriteTopic2 而不是 prefix=WriteTopic2 它工作。 :) 如果这是您正在寻找的解决方案,请随时在下面回答您自己的帖子 这是我正在寻找的解决方案的一小部分,对我来说更重要的是在顶部找到答案:“如何从 s3 中准确地为每个消费者读取一次数据而不重复且不删除s3 中的数据”。或者换句话说:“如何强制 Camel Source 连接器在不删除的情况下读取一次数据。” 【参考方案1】:

我找到了解决重复问题的方法,我不完全确定这是最好的方法,但它可能对某人有所帮助。我的方法在这里描述:https://camel.apache.org/blog/2020/12/CKC-idempotency-070/。我使用了camel.idempotency.repository.type=memory,我的配置文件如下:

name=CamelAWS2S3SourceConnector connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
camel.source.maxPollDuration=10000
topics=ReadTopic
# scieżka z ktorej czytamy dane
camel.source.endpoint.prefix=full/path/to/topic/prefix
camel.source.path.bucketNameOrArn="Bucket name"
camel.source.endpoint.deleteAfterRead=false
camel.component.aws2-s3.access-key=****
camel.component.aws2-s3.secret-key=****
camel.component.aws2-s3.region=****
#remove duplicates from messages# 
camel.idempotency.enabled=true 
camel.idempotency.repository.type=memory 
camel.idempotency.expression.type=body

我更改骆驼连接器库也很重要。最初我使用 camel-aws-s3-kafka-connector 源,要使用 Idempotent Consumer 我需要更改 camel-aws2-s3-kafka-connector 源上的连接器p>

【讨论】:

以上是关于使用 Apache Camel Source 从 S3 到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache Camel 从 Java 类访问 JMS 队列?

如何使用apache camel从soap响应中获取值?

ServiceMix 中 Apache-Camel 路由的管理和监控

[每日一学]apache camel|BDD方式开发apache camel|Groovy|Spock

Apache Camel端点注入直接路由“端点上没有可用的消费者”

原因:org.apache.camel.NoTypeConversionAvailableException:没有可用于从类型转换的类型转换器:POJO 到 byte[]