Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch

Posted Elastic 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch相关的知识,希望对你有一定的参考价值。

在 Elastic Stack 的使用中,一个典型的 Elastic Stack 架构框图:

从上面,我们可以看出来如果采集的数据比较多的话,我们可以使用 Kafka => Logstash => Elasticsearch 这样的 data pipeline 把数据传入到 Elasticsearch 中。在今天的文章中,我将介绍:

  • 从 Kafka topic 中读取数据
  • 使用 Logstash 来处理这个数据
  • 把 Logstash 传入的数据导入到 Elasticsearch 中,并最终在 Kibana 中进行展示

我将使用一个 Python 的应用生成一些随机的数据写入到 Kafka 中。这样的一个 Data pipeline 框架图如下:

在上面的架构中,有几个重要的组件:

  1. Kafka Server:这就是数据首先发布的地方。
  2. Producer:扮演将数据发布到 Kafka topic 的角色。 在现实世界中,你可以具有任何可以为 kafka 主题生成数据的实体。 在我们的示例中,我们将生成伪造的用户注册数据。
  3. Elasticsearch:这将充当将用户注册数据存储到其自身的数据库,并提供搜索及分析。
  4. Logstash:Logstash 将扮演中间人的角色,在这里我们将从 Kafka topic 中读取数据,然后将其插入到 Elasticsearch 中。
  5. Kibana:Kibana 将扮演图形用户界面的角色,它将以可读或图形格式显示数据。

为了演示的方便,你可以在地址下载演示文件 https://github.com/liu-xiao-guo/data-pipeline。我的文件目录是这样的:

$ pwd
/Users/liuxg/data/data-pipeline
$ tree -L 3
.
├── docker-elk
│   ├── docker-compose.yml
│   └── logstash_pipeline
│       └── kafka-elastic.conf
├── docker-kafka
│   └── kafka-docker-compose.yml
└── kafka_producer.py
$ pwd
/Users/liuxg/data/data-pipeline/docker-elk
$ ls -al
total 32
drwxr-xr-x  6 liuxg  staff   192 May 12 08:51 .
drwxr-xr-x  6 liuxg  staff   192 May 13 13:53 ..
-rw-r--r--  1 liuxg  staff    29 May  7 21:59 .env
-rw-r--r--  1 liuxg  staff  1064 May 13 14:00 docker-compose.yml
drwxr-xr-x  3 liuxg  staff    96 May 13 13:53 logstash_pipeline
$ cat .env
ELASTIC_STACK_VERSION=7.12.1

上面的其它文件将在我下面的章节中介绍。如果你自己想通过手动的方式部署 Kafka 请参阅我的另外一篇文章 “使用 Kafka 部署 Elastic Stack”。

 

安装

Kafka,Zookeeper 及 Kafka Manager

我将使用 docker-compose 来进行安装。一旦安装好,我们可以看到:

  • Kafka 在 PORT 9092 侦听
  • Zookeeper 在 PORT 2181 侦听
  • Kafka Manager 侦听 PORT 9000 侦听

kafka-docker-compose.yml

version: "3"
services:
  zookeeper:
    image: zookeeper
    restart: always
    container_name: zookeeper
    hostname: zookeeper
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
    - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.3 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka_manager:
    image: hlebalbau/kafka-manager:stable
    container_name: kakfa-manager
    restart: always
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zookeeper:2181"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null

我们可以使用如下的命令来进行启动(在 Docker 运行的前提下):

docker-compose -f kafka-docker-compose.yml up

一旦运行起来后,我们可以使用如下的命令来进行查看:

docker ps
$ docker ps
CONTAINER ID   IMAGE                            COMMAND                  CREATED        STATUS          PORTS                                                  NAMES
92a52c523872   zookeeper                        "/docker-entrypoint.…"   3 hours ago    Up 25 seconds   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
5938dc3c40a8   wurstmeister/kafka               "start-kafka.sh"         3 hours ago    Up 28 seconds   0.0.0.0:9092->9092/tcp                                 kafka
748145d6a32b   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   3 hours ago    Up 27 seconds   0.0.0.0:9000->9000/tcp                                 kakfa-manager
da08a6f9b567   rwynn/monstache:latest           "/bin/monstache -f .…"   2 months ago   Up 21 seconds   0.0.0.0:8080->8080/tcp                                 container_name

我们发现 Kafka Manager 运行于 9000 端口。我们打开本地电脑的 9000 端口:

 

点击页面下方的 Save 按钮:

这样就创建了我们的 liuxg 集群。

在上面它显示了一个默认的 topic,虽然不是我们想要的。

点击上面的 Create 菜单:

在上面,我们创建一个叫做 registered_user topic:

点击 Create 按钮:

这样,我们就把 Kafka 上的 registered_user topic 创建好了。

我们可以登录 kafka 容器来验证我们已经创建的 topic。我们使用如下的命令来找到 kafka 容器的名称:

docker ps -s
$ docker ps -s
CONTAINER ID   IMAGE                            COMMAND                  CREATED        STATUS          PORTS                                                  NAMES            SIZE
92a52c523872   zookeeper                        "/docker-entrypoint.…"   5 days ago     Up 4 minutes    2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper        38.1kB (virtual 269MB)
5938dc3c40a8   wurstmeister/kafka               "start-kafka.sh"         5 days ago     Up 4 minutes    0.0.0.0:9092->9092/tcp                                 kafka            474kB (virtual 439MB)
748145d6a32b   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   5 days ago     Up 4 minutes    0.0.0.0:9000->9000/tcp                                 kakfa-manager    2.89MB (virtual 430MB)
da08a6f9b567   rwynn/monstache:latest           "/bin/monstache -f .…"   2 months ago   Up 27 seconds   0.0.0.0:8080->8080/tcp                                 container_name   0B (virtual 30.5MB)

上面显示 kafka 的容器名称为 kafka。我们使用如下的命令来进行登录:

docker exec -it wurstmeister/kafka  /bin/bash

然后我们在容器里 打入如下的命令:

bash-4.4# kafka-topics.sh --list -zookeeper zookeeper:2181
__consumer_offsets
registered_user

上面的命令显示已经存在的被创建的 registered_user topic。我们可以使用如下的命令来向这个被创建的 topic 来发送数据:

bash-4.4# kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic registered_user --from-beginning

我们可以在下面再通过在 console 输入的办法来发送数据到 topic 里去。

 

Elastic Stack 按钮

我们接下来安装 Elastic Stack。同样地,我使用 docker-compose 来部署 Elasticsearch, Logstash 及 Kibana。你们可以参考我之前的文章 “Logstash:在 Docker 中部署 Logstash”。为了能够把数据传入到 Elasticsearch 中,我们需要在 Logstash 中配置一个叫做 kafka-elastic.conf 的配置文件:

kafka-elastic.conf

input {
    kafka {
       bootstrap_servers => "192.168.0.3:9092"
       topics => ["registered_user"]
    }
}

output {
   elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "registered-user"
      workers => 1
    }
}

请注意:在上面的 192.168.0.3 为我自己电脑的本地 IP 地址。为了说明问题的方便,我们没有对来自 kafka 里的 registered_user 这个 topic 做任何的数据处理,而直接发送到 Elasticsearch 中。

我们的 docker-compose.yml 配置文件如下:

docker-compose.yml

version: '3.7'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_STACK_VERSION}
    container_name: es
    environment:
      - discovery.type=single-node
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata01:/usr/share/elasticsearch/data        
    ports:
      - 9200:9200
    networks:
      - elastic
 
  kibana:
    image: docker.elastic.co/kibana/kibana:${ELASTIC_STACK_VERSION}
    container_name: kibana
    ports: ['5601:5601']    
    networks: ['elastic']
    environment:
      - SERVER_NAME=kibana.localhost
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - I18N_LOCALE=zh-CN
    depends_on: ['elasticsearch']

  logstash:
    image: logstash:${ELASTIC_STACK_VERSION}
    ports:
      - 5000:5000
    volumes: 
      - type: bind
        source: ./logstash_pipeline/
        target: /usr/share/logstash/pipeline
        read_only: true
    networks:
      - elastic        
 
volumes:
  esdata01:
    driver: local
 
networks:
  elastic:
    driver: bridge

我们使用如下的命令来启动 Elastic Stack。在 docker-compose.yml 所在的目录中打入如下的命令:

$ pwd
/Users/liuxg/data/data-pipeline/docker-elk
$ docker-compose up

等所有的 Elastic Stack 运行起来后,我们再次通过如下的命令来进行查看:

docker ps
$ docker ps
CONTAINER ID   IMAGE                                                  COMMAND                  CREATED        STATUS          PORTS                                                  NAMES
92a52c523872   zookeeper                                              "/docker-entrypoint.…"   3 hours ago    Up 35 minutes   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
5938dc3c40a8   wurstmeister/kafka                                     "start-kafka.sh"         3 hours ago    Up 35 minutes   0.0.0.0:9092->9092/tcp                                 kafka
748145d6a32b   hlebalbau/kafka-manager:stable                         "/kafka-manager/bin/…"   3 hours ago    Up 35 minutes   0.0.0.0:9000->9000/tcp                                 kakfa-manager
0e71d86bd3bb   docker.elastic.co/kibana/kibana:7.12.1                 "/bin/tini -- /usr/l…"   3 hours ago    Up 2 minutes    0.0.0.0:5601->5601/tcp                                 kibana
f43e2892dfc3   docker.elastic.co/elasticsearch/elasticsearch:7.12.1   "/bin/tini -- /usr/l…"   3 hours ago    Up 2 minutes    0.0.0.0:9200->9200/tcp, 9300/tcp                       es
dec11c7fce79   logstash:7.12.1                                        "/usr/local/bin/dock…"   3 hours ago    Up 2 minutes    5044/tcp, 0.0.0.0:5000->5000/tcp, 9600/tcp             docker-elk_logstash_1
da08a6f9b567   rwynn/monstache:latest                                 "/bin/monstache -f .…"   2 months ago   Up 2 seconds    0.0.0.0:8080->8080/tcp                                 container_name

我们可以看到 Elasticsearch 运用于 9000 端口,Kibana 运行于 5601 端口,而 Logstash 运行 5000 端口。 我们可以访问 Kibana 的端口地址 5601:

如果你看到这个画面,则表明我们的 Elastic Stack 是运行正常的。

 

运行 Python 应用导入模拟数据

我们接下来建立一个 Python 的应用来模拟一些数据:

kafka_producer.py

from faker import Faker
from kafka import KafkaProducer
import json
from data import get_registered_user
import time

fake = Faker()

def get_registered_user():
    return {
        "name": fake.name(),
        "address": fake.address(),
        "created_at": fake.year()
    }

def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(bootstrap_servers=['192.168.0.3:9092'],
                         value_serializer=json_serializer)

if __name__ == "__main__":
    while 1 == 1:
        registered_user = get_registered_user()
        print(registered_user)
        producer.send("registered_user", registered_user)
        time.sleep(4)

上面的应用每隔4秒就发送一个数据到 Kafka 的 registered_user 这个 topic。每次调用 get_registered_user() 方法将生成不同的注册用户名。

我们需要安装 faker 及 kafka-python 这两个 packages。Faker 用来生成不同的模拟的用户名字。

pip install Faker
pip install kafka-python

我们使用如下的方式来运行 Python 应用:

python3 kafka_producer.py 

我们将在屏幕上看到:

我们现在马上回到 Kibana 的界面使用如下的命令来查看:

GET registered-user/_count

我们可以看到不断被导入的数据。我们可以对这个 registered-user 索引进行搜索:

GET registered-user/_search

 

小结

在本例子中,我们展示了如何使用 Pyhon 应用作为一个 producer 把数据经过 Kafka => Logstash => Elasticsearch,并最终在 Kibana 中进行展示。

以上是关于Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章

Observability:Data pipeline:Beats => Redis => Logstash => Elasticsearch

Observability:Data pipeline:Beats => Redis => Logstash => Elasticsearch

使用 Jenkins Pipeline 将 webapp 部署到 AWS Elastic Beanstalk

AWS Data Pipeline 的默认日期时间

Elastic认证特训营 难点解读08——聚合的概念不好理解,怎么办?

Elasticsearch:ingest pipelines - 使用技巧和窍门