如何将 Apache Kafka 与 Amazon S3 连接?

Posted

技术标签:

【中文标题】如何将 Apache Kafka 与 Amazon S3 连接?【英文标题】:How to connect Apache Kafka with Amazon S3? 【发布时间】:2019-03-08 13:44:53 【问题描述】:

我想使用 Kafka Connect 将来自 Kafka 的数据存储到存储桶 s3 中。我已经运行了一个 Kafka 的主题,并且创建了一个存储桶 s3。我的主题有关于 Protobuffer 的数据,我尝试使用 https://github.com/qubole/streamx 并得到下一个错误:

 [2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
 [2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
 [2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
 [2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
 [2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
 java.lang.NullPointerException
    at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-10-04 13:35:47,194] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-04 13:35:51,235] INFO Reflections took 6844 ms to scan 259 urls, producing 13517 keys and 95788 values (org.reflections.Reflections:229)

我做了接下来的步骤:

    我克隆了存储库。 mvn DskipTests package

    nano config/connect-standalone.properties

    bootstrap.servers=ip-myip.ec2.internal:9092
    key.converter=com.qubole.streamx.ByteArrayConverter
    value.converter=com.qubole.streamx.ByteArrayConverter
    

    nano config/quickstart-s3.properties

    name=s3-sink 
    connector.class=com.qubole.streamx.s3.S3SinkConnector
    format.class=com.qubole.streamx.SourceFormat tasks.max=1
    topics=ssp.impressions
    flush.size=3
    s3.url=s3://myaccess_key:mysecret_key@mybucket/demo
    

    connect-standalone /etc/kafka/connect-standalone.properties quickstart-s3.properties

我想知道我这样做是否可行或以其他方式将数据从 Kafka 保存到 S3。

【问题讨论】:

你找到了什么教程?任何新的 Kafka 消费者都可以配置为从现有数据的偏移量开始读取 如果你从根本上改写它以提出不同的问题,你应该在你的问题文本中指出。我的回答是针对您最初的问题“如何将 Apache Kafka 连接到 S3”。 【参考方案1】:

您可以使用 Kafka Connect 与 Kafka Connect S3 连接器进行此集成。

Kafka Connect 是 Apache Kafka 的一部分,S3 connector 是一个开源连接器,可以通过standalone 或作为Confluent Platform 的一部分使用。

有关 Kafka Connect 的一般信息和示例,本系列文章可能会有所帮助:

https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/ https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

免责声明:我为 Confluent 工作,并撰写了上述博客文章。


2020 年 4 月:我录制了一段视频,展示了如何使用 S3 接收器:https://rmoff.dev/kafka-s3-video

【讨论】:

我想使用 Kafka Connect,但我在 Protobuffer 上有数据,这可能有问题吗? key.converter= value.converter= key.converter.schemas.enable= value.converter.schemas.enable= internal.key.converter= internal.value.converter= internal.key.converter.schemas.enable= internal.value。 converter.schemas.enable= 是的,有一个用于 Kafka Connect 的开源 protobuf 转换器,您可以使用:confluent.io/connector/kafka-connect-protobuf-converter 我有很多问题。我可以从其他 Kafka 实例连接到我的 Kafka 集群并以独立方式运行我的 Kafka 连接器 s3 吗?这个错误“ERROR Task s3-sink-0 throw an unaught an unrecoverable exception”是什么意思?如果您可以恢复连接到 Kafka 的步骤并从另一个 Kafka 实例继续使用 s3,您将如何做?【参考方案2】:

另一种方法是编写带有日志轮换的消费者,然后将玉米文件写入 S3。

【讨论】:

以上是关于如何将 Apache Kafka 与 Amazon S3 连接?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Apache kafka 与 Spring mvc 一起使用?可能吗?

Amazon Kinesis 与 AWS Manage Service Kafka (MSK) -(从本地连接)

从 Amazon SQS 馈送 Apache Spark 流?

angular.js 与 apache kafka 的集成

使用 Apache Camel Source 从 S3 到 Kafka

如何从 Amazon Elastic Beanstalk 中删除一些 Apache 设置?