使用confluent本地安装和使用kafka

Posted 张小凡vip

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用confluent本地安装和使用kafka相关的知识,希望对你有一定的参考价值。

转载请注明出处:使用confluent本地安装和使用kafka

confluent简介

confluent是平台化的工具,封装了kafka,让我们可以更方便的安装和使用监控kafka,作用类似于CDH对于Hadoop。

confluent是由LinkedIn开发出Apache Kafka的团队成员,基于这项技术创立了新公司Confluent,Confluent的产品也是围绕着Kafka做的。基本架构如下:

官网
https://www.confluent.io

下载地址
https://www.confluent.io/download/

物理机安装参考
Confluent Open Source Quick Start (Local)

docker安装参考
Confluent Open Source Quick Start (Docker)

对比之后感觉比原生的kafka安装简单很多,容器是docker容器的版本,对于我们在k8s中使用很方便。

Confluent的组件

Confluent Platform 包括更多的工具和服务,使构建和管理数据流平台更加容易。
Confluent Control Center(闭源)。管理和监控Kafka最全面的GUI驱动系统。
Confluent Kafka Connectors(开源)。连接SQL数据库/Hadoop/Hive
Confluent Kafka Clients(开源)。对于其他编程语言,包括C/C++,Python
Confluent Kafka REST Proxy(开源)。允许一些系统通过HTTP和kafka之间发送和接收消息。
Confluent Schema Registry(开源)。帮助确定每一个应用使用正确的schema当写数据或者读数据到kafka中。

Confluent的安装

下载地址:
http://www.confluent.io/download
打开后,显示最新版本,在右边填写信息后,点击Download下载。

本次我们主要使用REST Proxy,当然底层的broker也是使用confluent的kafka组件,下面简述安装步骤:

下载confluent4.0.0

wget http://packages.confluent.io/archive/4.0/confluent-oss-4.0.0-2.11.tar.gz
tar  xvf   confluent-oss-4.0.0-2.11.tar.gz

解压到指定目录下
通过查看目录的内容,能够发现,confluent里面是含有kafka的,也就是说,如果你没有安装kafka,那么可以通过confluent直接对kafka进行安装。如果已经安装了kafka,可以使用confluent提供的插件。

转载请注明出处:使用confluent本地安装和使用kafka

自定义配置

我们可以配置自己需要的和对应配置信息

进入解压出来的confluent-4.0.0

cd confluent-4.0.0

配置zookeeper

vi  etc/kafka/zookeeper.properties

内容如下:

dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

配置kafka的broker

vi etc/kafka/server.properties

内容如下:

broker.id=50
delete.topic.enable=true
listeners=PLAINTEXT://192.168.11.91:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.11.91:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous

配置rest proxy

vi  etc/kafka-rest/kafka-rest.properties

内容如下:

id=kafka-rest-server
#zookeeper.connect=192.168.11.91:2181
bootstrap.servers=PLAINTEXT://localhost:9092

配置schema registry

vi  etc/schema-registry/schema-registry.properties

内容如下:

listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.11.91:2181
kafkastore.topic=_schemas
debug=false

启动服务

启动kafka-rest

bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties

上面的这种方式是前台启动,也可以以后台方式启动。

nohup bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties &

启动zookeeper

bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 

启动kafka broker

bin/kafka-server-start -daemon  etc/kafka/server.properties 

启动schema registry

bin/schema-registry-start -daemon  etc/schema-registry/schema-registry.properties 

测试使用

查看topics
浏览器访问或者curl都可以

http://192.168.11.91:8082/topics

查看集群的brokers

curl http://192.168.11.91:8082/brokers

注册consumer group

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json"   --data '"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"'   http://localhost:8082/consumers/my_test_consumer

把topic和消费者my_consumer关联起来

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '"topics":["bear"]'   http://localhost:8082/consumers/my_test_consumer/instances/my_consumer_instance/subscription

通过rest接口向bear push数据

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json"           --data '"records":["value":"name": "testUser"]'     "http://localhost:8082/topics/bear"

通过rest接口消费数据

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"    http://localhost:8082/consumers/my_test_consumer/instances/my_consumer_instance/records

删除注册的consumer实例:

curl -X DELETE -H "Accept: application/vnd.kafka.v2+json"     http://localhost:8082/consumers/my_test_consumer/instances/my_consumer_instance

转载请注明出处:使用confluent本地安装和使用kafka

更多信息参考
https://github.com/confluentinc/kafka-rest

以上是关于使用confluent本地安装和使用kafka的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有 Confluent 的情况下使用 Kafka Connect 从 Kafka 向 AWS S3 发送数据?

kafka-confluent管控中心安装

confluent-kafka-go源码分析

在 Linux 上使用 confluent-kafka-go 构建 Go 应用程序

在没有安装 Confluent 平台的情况下使用 Confluent Hub

使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka