如何使用 docker 快速启动一个 kafka 应用?

Posted 1379號监听猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用 docker 快速启动一个 kafka 应用?相关的知识,希望对你有一定的参考价值。

介绍

在阿里云服务器启动测试用的 Kafka 应用。由于资源有限,使用单机模式。

1.下载镜像

  • zookeeper
docker pull zookeeper
  • kafka
docker pull wurstmeister/kafka

2.启动 zookeeper

  • 单机方式
docker run -d --name docker_zookeeper -p 2181:2181 -t zookeeper

3.启动 Kafka

docker run -d --name docker_kafka \\
-p 9092:9092 \\
--add-host=$hostname:$服务器IP\\
-e KAFKA_BROKER_ID=0 \\
-e KAFKA_ZOOKEEPER_CONNECT=$hostname:2181 \\
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$hostname:9092 \\
-e KAFKA_JVM_PERFORMANCE_OPTS="-Xmx256m -Xms256m" \\
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

说明:
由于阿里云服务器内外网ip不一致,因此为了外网能够正常访问 Kafka,让内外网使用一致的 hostname 访问:

  • --add-host:添加容器 hosts 映射
    • hostname 是云服务器的 hostname;
    • 服务器 ip 为 阿里云内网。

之后,需要在外网机器的 hosts 中添加:

# 阿里云服务器地址映射
$服务器外网IP $服务器hostname

此外,Kafka 的内存默认占用 1G,云服务器资源有限,因此指定 Kafka 的 jvm 参数

  • -e KAFKA_JVM_PERFORMANCE_OPTS

4.命令行验证

  • 进入容器
# 进入容器
docker exec -it $CONTAINER ID /bin/bash
cd opt/kafka
  • 创建一个topic
bin/kafka-topics.sh --create --zookeeper $hostname:2181 --replication-factor 1 --partitions 1 --topic mykafka
  • 运行一个生产者并发送消息
bin/kafka-console-producer.sh --bootstrap-server $hostname:9092 --topic mykafka

例如:

bash-5.1# bin/kafka-console-producer.sh --bootstrap-server zhy:9092 --topic mykafka
>zhy
  • 运行一个消费者并接受消息
bin/kafka-console-consumer.sh --bootstrap-server $hostname:9092 --topic mykafka --from-beginning

例如:

bash-5.1# bin/kafka-console-consumer.sh --bootstrap-server zhy:9092 --topic mykafka --from-beginning
zhy

5.外网Java API 验证

  • 启动一个消费者:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CustomConsumer 

    public static void main(String[] args) 
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zhy:9092"); // 此处也可以使用 ip
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 订阅主题 mykafka
        kafkaConsumer.subscribe(Stream.of("mykafka").collect(Collectors.toList()));

        // 3 消费数据
        while (true) 
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
                System.out.println(consumerRecord);
            
            kafkaConsumer.commitAsync();
        
    

  • 生产者(如果没有上面为 Kafka 容器做hostname映射,并且使用hostname,此时外网始终无法发送数据)
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer 

    public static void main(String[] args) 
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "zhy:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        
        for (int i = 0; i < 5; i++) 
            producer.send(new ProducerRecord<>("mykafka", "CustomProducerTest@" + i));
        
        producer.close();
    

此时,之前使用 JavaAPI 启动的 消费者输出如下:

ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1649684611004, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@0)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@1)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@2)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@3)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@4)

同时,可以在之前打开的消费者窗口看见内容:

bash-5.1# bin/kafka-console-consumer.sh --bootstrap-server zhy:9092 --topic mykafka --from-beginning
zhy
CustomProducerTest@0
CustomProducerTest@1
CustomProducerTest@2
CustomProducerTest@3
CustomProducerTest@4
与50位技术专家面对面 20年技术见证,附赠技术全景图

以上是关于如何使用 docker 快速启动一个 kafka 应用?的主要内容,如果未能解决你的问题,请参考以下文章

使用Docker快速搭建Zookeeper和kafka集群

docker启动kafka

Docker快速安装kafka

Docker搭建Kafka集群

kafka-docker上使用+常用指令

docker快速安装kafka,zookeeper ,体验spring-boot-demo-mq-kafka