Kafka在Mac下的安装与使用

Posted super先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka在Mac下的安装与使用相关的知识,希望对你有一定的参考价值。

mac 安装kafka

安装kafka的原因

用户微服务登录后需要向广告微服务中发送用户登录的信息以获取用户画像(这个过程是异步的),故而在用户微服务中配置了kafka。配置的kafka的连接地址是测试环境的,为了避免给测试环境造成脏数据,因而我需要本地Mac安装kafka。

安装kafka

  1. 我的网盘:链接: https://pan.baidu.com/s/1mS6MVBehe2Kko70Zdln-hA?pwd=62wr 提取码: 62wr

  2. kafka的官网地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz

下载完成后,解压到当前目录中,或者移动到你想要的目录,比如

mv /Users/你的用户名/Downloads/kafka_2.12-3.1.0 /Users/你的用户名/kafka

进入kafka的bin目录,执行如下命令,查看bin目录下的文件,有下面要执行的文件。

cd ./kafka/bin
pwd 查看当前目录,确实为bin目录
ls  查看bin目录下的文件


你会看到zookeeper和kafka相关的文件,如下是具体操作zookeeper和kafka。

启动Zookeeper

打开新的terminal,进入Kafka的 bin目录,执行如下命令:

./zookeeper-server-start.sh ../config/zookeeper.properties &

zookeeper会打印一系列的日志,如下所示:

INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,867] WARN ../config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,879] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,879] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,879] INFO observerMasterPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,879] INFO metricsProvider.className is org.apache.zookeeper.metrics.impl.DefaultMetricsProvider (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,881] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-04-22 14:22:13,881] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-04-22 14:22:13,881] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2022-04-22 14:22:13,881] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2022-04-22 14:22:13,884] INFO Log4j 1.2 jmx support found and enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2022-04-22 14:22:13,896] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,896] WARN ../config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,896] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,897] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,897] INFO observerMasterPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,897] INFO metricsProvider.className is org.apache.zookeeper.metrics.impl.DefaultMetricsProvider (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2022-04-22 14:22:13,897] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2022-04-22 14:22:13,912] INFO ServerMetrics initialized with provider org.apache.zookeeper.metrics.impl.DefaultMetricsProvider@60addb54 (org.apache.zookeeper.server.ServerMetrics)
[2022-04-22 14:22:13,916] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2022-04-22 14:22:13,930] INFO  (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,930] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,930] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,930] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,930] INFO    / /    / _ \\   / _ \\  | |/ /  / _ \\  / _ \\ | '_ \\   / _ \\ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,931] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,931] INFO  /_____|  \\___/   \\___/  |_|\\_\\  \\___|  \\___| | .__/   \\___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,931] INFO                                               | |                      (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,931] INFO                                               |_|                      (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,931] INFO  (org.apache.zookeeper.server.ZooKeeperServer)
[2022-04-22 14:22:13,933] INFO Server environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.server.ZooKeeperServer)
......

启动Kafka

打开新的terminal,进入Kafka的 bin目录,执行如下命令:

./kafka-server-start.sh ../config/server.properties & 

你会看到如下一系列日志,直到看到这句话INFO [KafkaServer id=0] started (kafka.server.KafkaServer)就说明启动成功了

INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-22 14:23:55,309] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-04-22 14:23:55,384] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2022-04-22 14:23:55,388] INFO starting (kafka.server.KafkaServer)
[2022-04-22 14:23:55,388] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2022-04-22 14:23:55,411] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2022-04-22 14:23:55,417] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
[2022-04-22 14:23:55,417] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2022-04-22 14:23:55,417] INFO Client environment:java.version=1.8.0_211 (org.apache.zookeeper.ZooKeeper)
[2022-04-22 14:23:55,417] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2022-04-22 14:23:55,417] INFO Client environment:java.home=/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home (org.apache.zookeeper.ZooKeeper)
。。。

创建topic

开启新的terminal,进入kafka的bin目录,执行如下命令

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testInfoTopic --partitions 2 --replication-factor 1


你会看到如上图所示的信息,就说明创建topic成功。

查看topic

基于上一个terminal的kafka的bin目录,执行如下命令

./kafka-topics.sh --list --bootstrap-server localhost:9092


你会看到所创建的topic:testInfoTopic

生产数据

基于上一个terminal的kafka的bin目录,执行如下命令:

./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic


先不输入数据。

消费数据

开启新的terminal,进入kafka的bin目录中,执行如下命令

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic


你会看到,此时没有数据。

在生产者的terminal中发送消息:

在消费者的terminal中消费消息:

综上所看,消息成功发送,也被成功消费。

关闭zookeeper

开启新的terminal,进入kafka的bin目录,执行如下命令

sh zookeeper-server-stop.sh

再次执行这个命令,你会看到No zookeeper server to stop,说明zookeeper已关闭。

但是执行命令ps -eaf | grep zookeeper,仍看到zookeeper的进程,说明还有其子节点没有关闭。此为kafka没有关闭,即70267,于是去关闭kafka.

关闭kafka

在上面的terminal中,执行如下命令关闭kafka

sh kafka-server-stop.sh

再次执行该命令,你会看到No kafka server to stop
执行命令ps -eaf | grep zookeeper,没看到70267的kafka节点进程,说明kafka已关闭。
进而执行命令ps -eaf | grep kafka ,没有kafka的进程,再次说明kafka已关闭。

因而,即便zookeeper关闭了,如果其下节点没有关闭,节点进程仍旧存在。因而,zookeeper和kafka都要关闭。

测试

启动用户微服务,其kafka的topic是UserLoginInfo。

使用postman调用登录接口,同时,在terminal启动kafka和zookeeper服务,再开启一新terminal,并执行命令:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic UserLoginInfo ,得到如下消息:

说明kafka和zookeeper是生效的。

注意事项:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic UserLoginInfo 在接口请求之前要执行。

mac zookeeper&kafka 安装包搭建环境

1、安装zookeeper

因为zookeeper 与kafka 存在对应的版本,选择不当,将无法使用,所以两者都使用最新版本

下载地址:https://zookeeper.apache.org/releases.html 

技术图片

 

 

目录

下载并解压ZooKeeper软件压缩包后,可以看到zk包含以下的文件和目录:

图1:ZooKeeper软件的文件和目录

    • bin目录
      zk的可执行脚本目录,包括zk服务进程,zk客户端,等脚本。其中,.sh是Linux环境下的脚本,.cmd是Windows环境下的脚本。
    • conf目录
      配置文件目录。zoo_sample.cfg为样例配置文件,需要修改为自己的名称,一般为zoo.cfg。log4j.properties为日志配置文件。
    • lib
      zk依赖的包。
    • contrib目录
      一些用于操作zk的工具包。
    • recipes目录
      zk某些用法的代码示例

运行配置

上面提到,conf目录下提供了配置的样例zoo_sample.cfg,要将zk运行起来,需要将其名称修改为zoo.cfg。
打开zoo.cfg,可以看到默认的一些配置。

    • tickTime
      时长单位为毫秒,为zk使用的基本时间度量单位。例如,1 * tickTime是客户端与zk服务端的心跳时间,2 * tickTime是客户端会话的超时时间。
      tickTime的默认值为2000毫秒,更低的tickTime值可以更快地发现超时问题,但也会导致更高的网络流量(心跳消息)和更高的CPU使用率(会话的跟踪处理)。
    • clientPort
      zk服务进程监听的TCP端口,默认情况下,服务端会监听2181端口。
    • dataDir
      无默认配置,必须配置,用于配置存储快照文件的目录。如果没有配置dataLogDir,那么事务日志也会存储在此目录。

配置环境变量 vim ~/.bash_profile 

export ZK_HOME=/Users/yyj/big_data/zookeeper
export KAFKA_HOME=/Users/yyj/big_data/kafka_2.12-2.5.0
export PATH=$PATH:$ANDROID_HOME/platform-tools:${KAFKA_HOME}/bin:${ZK_HOME}/bin

 

启动 进入bin目录,执行命令

zkServer.sh start

 

安装Kafka

下载 http://kafka.apache.org/downloads

解压

tar -xzf kafka_2.12-2.5.0.tgz

注意,kafka_2.12-2.5.0.tgz版本是已经编译好的版本,解压就能使用。

 配置server.properties

默认配置 advertised.listeners=PLAINTEXT://:your.host.name:9092 修改为 advertised.listeners=PLAINTEXT://:ip:9092

ip为服务器ip。

hostname和端口是用来建议给生产者和消费者使用的,如果没有设置,将会使用listeners的配置,如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()来获取这个hostname和port,对于ipv4,基本就是localhost了。

"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效。也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。

 

配置环境变量

export ZK_HOME=/Users/yyj/big_data/zookeeper
export KAFKA_HOME=/Users/yyj/big_data/kafka_2.12-2.5.0
export PATH=$PATH:$ANDROID_HOME/platform-tools:${KAFKA_HOME}/bin:${ZK_HOME}/bin

启动Kafka

 启动ZooKeeper

zkServer.sh start

 

注意,需要先启动ZooKeeper再启动kafka,不然会报错。如下图:

启动kafka

kafka-server-start.sh /Users/yyj/big_data/kafka_2.12-2.5.0/config/server.properties

 

启动Kafka Broker后,在ZooKeeper终端上键入命令 jps,效果如下:

技术图片

 

 

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

 

其中demo为创建的topic名称。

技术图片

如上图,创建了一个名为 demo 的主题,其中包含一个分区和一个副本因子。 创建成功之后会输出: Created topic "demo".

技术图片

如上图,创建主题后,系统会在config / server.properties文件中的"/ tmp / kafka-logs /"中指定的创建主题的日志。

 查询topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

 查看topic信息

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo

 

 删除topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo

  

 

启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo

  

从上面的语法,生产者命令行客户端需要两个主要参数 -

代理列表 - 我们要发送邮件的代理列表。 在这种情况下,我们只有一个代理。 Config / server.properties文件包含代理端口ID,因为我们知道我们的代理正在侦听端口9092,因此您可以直接指定它。主题名称:demo。

 启动消费者

为了方便测试,另启一个sheel窗口 这样效果更明显。需要注意的是旧版本和新版本的命令是不一样的

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo --from-beginning

  

报错提示: zookeeper is not a recognized option

发现在启动的时候说使用 --zookeeper是一个过时的方法,最新的版本中命令如下:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

 

可以开启两个终端,一个发送消息,一个接受消息。效果如下:

 

配置启动 关闭shell 脚本

start_all.sh 

#!/bin/bash

# 启动zk
zkServer.sh start

# 启动kafka
nohup kafka-server-start.sh /Users/yyj/big_data/kafka_2.12-2.5.0/config/server.properties >> /Users/yyj/big_data/kafka.log &

stop_all.sh

#!/bin/bash

# 关闭kafka
kafka-server-stop.sh /Users/yyj/big_data/kafka_2.12-2.5.0/config/server.properties

# 关闭zk
zkServer.sh stop

 

 

 

 

 

 

以上是关于Kafka在Mac下的安装与使用的主要内容,如果未能解决你的问题,请参考以下文章

kafka安装和使用

Kafka 安装与启动

Kafaka入门(1)- Kafka简介和安装与启动(mac)

Ubuntu Kafka的安装和使用

kafka 2.12在linux下的安装部署及java客户端对接

安装kafka 记录