Kafka快速入门——Confluent Kafka简介
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka快速入门——Confluent Kafka简介相关的知识,希望对你有一定的参考价值。
Kafka快速入门(八)——Confluent Kafka简介
一、Confluent Kafka简介
1、Confluent Kafka简介
2014年,Kafka的创始人Jay Kreps、NahaNarkhede和饶军离开LinkedIn创立Confluent公司,专注于提供基于Kafka的企业级流处理解决方案,并发布了Confluent Kafka。Confluent Kafka分为开源版和企业版,企业版收费。
2、Confluent Kafka特性
Confluent Kafka开源版特性如下:
(1)Confluent Kafka Connectors:支持Kafka Connect JDBC Connector、Kafka Connect HDFS Connector、Kafka Connect Elasticsearch Connector、Kafka Connect S3 Connector。
(2)多客户端支持:支持C/C++、Python、Go、.Net、Java客户端。
(3)Confluent Schema Registry
(4)Confluent Kafka REST Proxy
Confluent Kafka企业版特性如下:
(1)Automatic Data Balancing
(2)Multi-DataCenter Replication
(3)Confluent Control Center
(4)JMS Client
3、Confluent Kafka客户端
4、Confluent Kafka客户端特性支持
二、?Confluent Kafka组件
1、Schema Registry
当通过网络发送数据或将数据存储在文件中时,需要对数据进行序列化。常见跨语言的序列化库如Avro、Thrift和Protocol buffer,存在两个明显的缺点:
(1)数据生产者和数据消费者间缺乏契约。如果上游生产者随意更改数据格式,很难确保下游消费者能够正确解释数据。
(2)开销和冗余。所有消息都会显示保存相同字段名和类型信息,存在大量冗余数据。
无论是使用传统的Avro API自定义序列化类和反序列化类还是使用Twitter的Bijection类库实现Avro的序列化与反序列化,都会在每条Kafka记录里都嵌入schema,会让记录的大小成倍地增加。在读取记录时需要用到整个schema,Schema Registry可以让所有记录共用一个schema。
生产者把写入消息需要用到的schema保存到Schema Registry,然后在消息中引用schema的ID。消费者使用ID从Schema Registry里拉取schema来反序列化消息。
User的schema文件如下:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
Confluent Schema Registry为开发人员提供了一个RESTful接口,以便为事件定义标准schema,在整个组织中共享标准schema,并以向后兼容和未来证明的方式安全地对其进行改进。
2、Control Center
Confluent Control Center是一个对整个产品进行管理的控制中心,最主要的功能对Kafka集群的各个生产者和消费者的进行性能监控,同时可以很容易地管理Kafka的连接、创建、编辑和管理与其它系统的连接。
Confluent Control Center只在Confluent Kafk企业版提供支持。
3、KSQL
KSQL是面向Apache Kafka的一种流式SQL引擎,为使用Kafka处理数据提供了一种简单的、完全交互的SQL界面,支持众多功能强大的数据流处理操作,包括聚合、连接、加窗(windowing)和sessionization(捕获单一访问者的网站会话时间范围内所有的点击流事件)等等。
KSQL目前还无法执行查询,KSQL提供的是对Kafka中的数据流执行连续查询,即随着新数据不断流入,转换在连续进行。
4、Confluent Replicator
Confluent Platform可以轻松地在多个数据中心内维护多个Kafka群集。Confluent Replicator提供了管理数据中心之间的数据复制和topic配置的功能,应用场景如下:
(1)ative-active地理定位部署:允许用户访问最近(附近)的数据中心,以优化其架构,实现低延迟和高性能。
(2)集中分析:将来自多个Kafka集群的数据聚合到一个地方,以进行组织范围的分析。
(3)云迁移:可以使用Kafka完成本地应用与云之间的数据迁移。
5、Confluent Auto Data Balancer
随着集群的增长,topic和partition以不同的速度增长,随着时间的推移,添加和删除会导致跨数据中心资源的工作负载不平衡(数据倾斜)。Confluent Auto Data Balancer会监控集群中的Broker数量、partition大小、partition数量,允许转移数据以在整个群集中创建均匀的工作负载,同时限制重新平衡流量,以最大限度地减少重新平衡时对生产工作负载的影响。
6、Kafka Connect
Kafka Connect是Kafka的一个开源组件,是用来将Kafka与数据库、key-value存储系统、搜索系统、文件系统等外部系统连接起来的基础框架。
通过使用Kafka Connect框架以及现有的Connector可以实现从源数据读入消息到Kafka,再从Kafka读出消息到目的地的功能。
Confluent在Kafka Connect基础上提供了对多种常用系统的支持:
(1)Kafka Connect ActiveMQ Connector
(2)Kafka FileStream Connectors
(3)Kafka Connect HDFS
(4)Kafka Connect JDBC Connector
(5)Confluent Kafka Replicator
(6)Kafka Connect S3
(7)Kafka Connect Elasticsearch Connector
(8)Kafka Connect IBM MQ Connector
(9)Kafka Connect JMS Connector
7、MQTT Proxy333
MQTT Proxy提供了允许MQTT客户端以Kafka原生方式直接向Kafka发送消息的可伸缩的轻量级接口。
MQTT Proxy用于把基于物联网(IoT)的应用程序和Kafka 集成,可以用于联网汽车、制造业生产线和预测性维护应用程序中。
MQTT Proxy使用传输层安全(TLS)加密和基本身份验证,支持?MQTT 3.1.1 协议,可以发布所有三个MQTT服务质量级别的消息,服务级别包括把消息发送给消费者:1)最多一次(即发即弃);2)至少一次(需要确认);3)恰好一次。
三、Confluent Kafka Docker部署
1、Confluent Kafka一站式部署
Confluent提供了Confluent Kafka所有组件的一站式部署,即cp-all-in-one项目,分为企业版本和开源社区版本,企业版比社区版本多了Control Center服务。
GitHub项目地址:
https://github.com/confluentinc/cp-all-in-one
2、Confluent Kafka企业版
docker-compose.yml文件:
version: ‘2‘
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: ‘zookeeper:2181‘
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: ‘true‘
CONFLUENT_SUPPORT_CUSTOMER_ID: ‘anonymous‘
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: ‘zookeeper:2181‘
connect:
image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: ‘broker:29092‘
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: ‘zookeeper:2181‘
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:5.5.0
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: ‘broker:29092‘
CONTROL_CENTER_ZOOKEEPER_CONNECT: ‘zookeeper:2181‘
CONTROL_CENTER_CONNECT_CLUSTER: ‘connect:8083‘
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:5.5.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:5.5.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:5.5.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c ‘echo Waiting for Kafka to be ready... && cub kafka-ready -b broker:29092 1 40 && echo Waiting for Confluent Schema Registry to be ready... && cub sr-ready schema-registry 8081 40 && echo Waiting a few seconds for topic creation to finish... && sleep 11 && tail -f /dev/null‘"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:5.5.0
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: ‘broker:29092‘
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: ‘http://schema-registry:8081‘
启动容器服务:docker-compose -f docker-compose.yml up -d
关闭容器服务:docker-compose -f docker-compose.yml down
3、Confluent Kafka社区版
docker-compose.yml文件:
version: ‘2‘
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: ‘zookeeper:2181‘
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: ‘zookeeper:2181‘
connect:
image: cnfldemos/kafka-connect-datagen:0.3.2-5.5.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: ‘broker:29092‘
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: ‘zookeeper:2181‘
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
ksqldb-server:
image: confluentinc/cp-ksqldb-server:5.5.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:5.5.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:5.5.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c ‘echo Waiting for Kafka to be ready... && cub kafka-ready -b broker:29092 1 40 && echo Waiting for Confluent Schema Registry to be ready... && cub sr-ready schema-registry 8081 40 && echo Waiting a few seconds for topic creation to finish... && sleep 11 && tail -f /dev/null‘"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:5.5.0
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: ‘broker:29092‘
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: ‘http://schema-registry:8081‘
启动容器服务:docker-compose -f docker-compose.yml up -d
关闭容器服务:docker-compose -f docker-compose.yml down
4、Confluent Kafka云服务版
docker-compose.yml文件:
version: ‘2‘
services:
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
ports:
- ‘8085:8085‘
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "PLAIN"
ksqldb-server:
image: confluentinc/cp-ksqldb-server:5.5.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8089:8089"
environment:
KSQL_HOST_NAME: ksqldb-server
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LISTENERS: "http://0.0.0.0:8089"
KSQL_AUTO_OFFSET_RESET: "earliest"
KSQL_COMMIT_INTERVAL_MS: 0
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
KSQL_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
KSQL_SECURITY_PROTOCOL: "SASL_SSL"
KSQL_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
KSQL_SASL_MECHANISM: "PLAIN"
KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
KSQL_KSQL_STREAMS_PRODUCER_RETRIES: 2147483647
KSQL_KSQL_STREAMS_PRODUCER_CONFLUENT_BATCH_EXPIRE_MS: 9223372036854775807
KSQL_KSQL_STREAMS_PRODUCER_REQUEST_TIMEOUT_MS: 300000
KSQL_KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
KSQL_KSQL_STREAMS_PRODUCER_DELIVERY_TIMEOUT_MS: 2147483647
KSQL_KSQL_STREAMS_REPLICATION_FACTOR: 3
KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: 3
KSQL_KSQL_SINK_REPLICAS: 3
# Producer Confluent Monitoring Interceptors for Control Center streams monitoring
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: "SASL_SSL"
KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
# Consumer Confluent Monitoring Interceptors for Control Center streams monitoring
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: "SASL_SSL"
KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:5.5.0
container_name: ksqldb-cli
entrypoint: /bin/sh
tty: true
control-center:
image: confluentinc/cp-enterprise-control-center:5.5.0
hostname: control-center
container_name: control-center
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8089"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8089"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
CONTROL_CENTER_STREAMS_SECURITY_PROTOCOL: SASL_SSL
CONTROL_CENTER_STREAMS_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONTROL_CENTER_STREAMS_SASL_MECHANISM: PLAIN
CONTROL_CENTER_REPLICATION_FACTOR: 3
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 3
CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 3
CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 3
CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 3
CONFLUENT_METRICS_TOPIC_REPLICATION: 3
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 3
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
# Workaround for MMA-3564
CONTROL_CENTER_METRICS_TOPIC_MAX_MESSAGE_BYTES: 8388608
PORT: 9021
connect:
image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0
hostname: connect
container_name: connect
ports:
- "8083:8083"
volumes:
- mi2:/usr/share/java/monitoring-interceptors/
environment:
CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect"
CONNECT_CONFIG_STORAGE_TOPIC: demo-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: demo-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: demo-connect-status
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
# Connect producer
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN
rest-proxy:
image: confluentinc/cp-kafka-rest:5.5.0
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
KAFKA_REST_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
KAFKA_REST_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"
KAFKA_REST_SECURITY_PROTOCOL: "SASL_SSL"
KAFKA_REST_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
KAFKA_REST_SASL_MECHANISM: "PLAIN"
KAFKA_REST_CLIENT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS
KAFKA_REST_CLIENT_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"
KAFKA_REST_CLIENT_SECURITY_PROTOCOL: "SASL_SSL"
KAFKA_REST_CLIENT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG
KAFKA_REST_CLIENT_SASL_MECHANISM: "PLAIN"
volumes:
mi2: {}
启动容器服务:docker-compose -f docker-compose.yml up -d
关闭容器服务:docker-compose -f docker-compose.yml down
以上是关于Kafka快速入门——Confluent Kafka简介的主要内容,如果未能解决你的问题,请参考以下文章
使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka