Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道

Posted Elastic 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道相关的知识,希望对你有一定的参考价值。

如今,网络服务、数字媒体、传感器日志数据等众多来源产生了大量数据,只有一小部分数据得到妥善管理或利用来创造价值。读取大量数据、处理数据并根据这些数据采取行动比以往任何时候都更具挑战性。

在这篇文章中,我试图展示:

  • 在 Python 中生成模拟用户配置文件数据
  • 通过 Kafka Producer 将模za拟数据发送到 Kafka 主题
  • 使用 Logstash 读取数据并上传到 Elasticsearch
  • 使用 Kibana 可视化流数据

在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我实现了如下的一个数据 pipeline:

 在今天的文章中,我将实现如下的一个数据 pipeline:

在今天的展示中,我将使用最新的 Elastic Stack 8.6.1 来进行展示。我将使用如下的配置:

如上所示,我使用两台机器:macOS 用于安装 Elastic Stack,而另外一台 Ubuntu 机器将被用于安装 Kafka 及 Logstash。我将在 Ubuntu OS 机器上使用 Python 向 Kafka 写入数据。

安装

Elasticsearch 及 Kibana

我将使用 docker compose 的方法来安装 Elasticsearch 及 Kibana。我们可以参考文章 “Elasticsearch:使用 Docker compose 来一键部署 Elastic Stack 8.x” 来进行部署。当然,我们也可以参阅如下的文章来进行部署:

在默认的情况下,Elasticsearch 的访问是带有 HTTPS 的安全访问。

我们可以在电脑的 terminal 中打入如下的命令来检查:

curl -k -u elastic:password https://192.168.0.3:9200

 上述命令是在 Ubuntu OS 的机器上运行。它表明,我们可以在 Ubuntu OS 的机器上成功地访问 Elasticsearch。

安装 Kafka

我们安装涉及设置 Apache Kafka(我们的消息代理)。Kafka 使用 ZooKeeper 来维护配置信息和同步,因此在设置 Kafka 之前,我们需要先安装 ZooKeeper:

sudo apt-get install zookeeperd

接下来,让我们下载并解压缩 Kafka:

wget https://apache.mivzakim.net/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -xzvf kafka_2.13-2.4.0.tgz
sudo cp -r kafka_2.13-2.4.0 /opt/kafka

现在,我们准备运行 Kafka,我们将使用以下脚本进行操作:

sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

你应该开始在控制台中看到一些 INFO 消息:

Kafka 的配置如下

  • Kafka 正在监听 9092 端口
  • Zookeeper 正在监听 2181 端口
  • Kafka Manager 正在监听 9000 端口

我们接下来打开另外一个控制台中,并为 registered_user 创建一个主题:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic registered_user

我们创建了一个叫做 registered_user 的 topic。上面的命令将返回如下的结果:

$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic registered_user
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic registered_user.

我们现在已经完全为开始管道做好了准备。

有关 kafka 的安装,我们也可以使用 docker-compose 来进行安装。具体安装步骤请参考 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch”。

Logstash

我们接下来安装 Logstash。我们到 Elastic 的官方网站来下载时候我们平台的安装包:

 wget https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w
liuxg@liuxgu:~/logstash$ wget https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w
--2023-01-29 14:20:31--  https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w
Resolving artifacts.elastic.co (artifacts.elastic.co)... 34.120.127.130, 2600:1901:0:1d7::
Connecting to artifacts.elastic.co (artifacts.elastic.co)|34.120.127.130|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 341638094 (326M) [binary/octet-stream]
Saving to: ‘logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w’

logstash-8.6.1-amd64.de 100%[==============================>] 325.81M  10.7MB/s    in 31s     

2023-01-29 14:21:03 (10.6 MB/s) - ‘logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w’ saved [341638094/341638094]

liuxg@liuxgu:~/logstash$ mv 'logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w' logstash-8.6.1-amd64.deb

我们使用如下的命令来安装 Logstash:

sudo dpkg -i logstash-8.6.1-amd64.deb 
liuxg@liuxgu:~/logstash$ sudo dpkg -i logstash-8.6.1-amd64.deb 
[sudo] password for liuxg: 
(Reading database ... 386953 files and directories currently installed.)
Preparing to unpack logstash-8.6.1-amd64.deb ...
Unpacking logstash (1:8.6.1-1) over (1:8.4.2-1) ...
Setting up logstash (1:8.6.1-1) ...
Installing new version of config file /etc/logstash/jvm.options ...

为了能够配置 Logstash 能够正确地访问 Elasticsearch,我们可以参考我之前的文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。我们需要按照文章里的要求创建一个叫做 truststore.p12 的文件。由于我们是以 docker 的形式启动 Elasticsearch 及 Kibana 的,我们在 macOS 的机器上使用如下的命令来拷贝证书。我们先查看容器:

$ docker ps
CONTAINER ID   IMAGE                                                 COMMAND                  CREATED       STATUS                 PORTS                              NAMES
a2374f620b78   docker.elastic.co/kibana/kibana:8.6.1                 "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   0.0.0.0:5601->5601/tcp             elastic8-kibana-1
e2d6443b8edb   docker.elastic.co/elasticsearch/elasticsearch:8.6.1   "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   9200/tcp, 9300/tcp                 elastic8-es03-1
a29bbeb4bdf2   docker.elastic.co/elasticsearch/elasticsearch:8.6.1   "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   9200/tcp, 9300/tcp                 elastic8-es02-1
81de3d45943c   docker.elastic.co/elasticsearch/elasticsearch:8.6.1   "/bin/tini -- /usr/l…"   7 hours ago   Up 7 hours (healthy)   0.0.0.0:9200->9200/tcp, 9300/tcp   elastic8-es01-1

我们可以看到有一个叫做 elastic8-es01-1 的容器。

$ pwd
/Users/liuxg/data/elastic8
$ ls
docker-compose.yml http_ca.crt        kibana.yml         write_to_kafka.py
$ docker cp 81de3d45943c:/usr/share/elasticsearch/config/certs/ca/ca.crt .
$ ls
ca.crt             docker-compose.yml http_ca.crt        kibana.yml         write_to_kafka.py
$ ls
ca.crt             docker-compose.yml kibana.yml       

运用 ca.crt 文件,我们使用如下的命令来创建一个叫做 truststore.p12 的文件。它的 storepass 是 password:

keytool -import -file ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
$ keytool -import -file ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
Certificate was added to keystore
$ ls
ca.crt             docker-compose.yml kibana.yml         truststore.p12

从上面,我们可以看出来它创建了 truststore.p12 这个文件。我们接下来把这个文件拷贝到 Ubuntu OS 机器下的 /etc/logstash/conf.d 目录中。

liuxg@liuxgu:/etc/logstash/conf.d$ ls
truststore.p12

我们接下来在地址 /etc/logstash/conf.d 创建一个叫做叫做 kafka_to_logstash.conf 的配置文件:

kafka_to_logstash.conf

input 
  kafka 
    bootstrap_servers => "192.168.0.4:9092"
    topics => ["registered_user"]
  


filter 
  json 
    source => "message"
  


output 
  elasticsearch 
      hosts => ["https://192.168.0.3:9200"]
      index => "registered_user"
      workers => 1
      user => "elastic"
      password => "password"
      ssl_certificate_verification => true
      truststore => "/etc/logstash/conf.d/truststore.p12"
      truststore_password => "password"
  

在上面,请注意的是:

  • 我们使用 Elasticsearch 的超级用户 elastic 来连接 Elasticsearch。它的密码是 password。在实际的使用中,我们可以创建一个合适权限的用户来进行连接。

这样我们的 Logstash 的配置就完成了。

sudo service logstash start

我们可以通过如下的命令来检查 Logstash 是否已经成功地运行起来了。

service logstash status
liuxg@liuxgu:~$ service logstash status
● logstash.service - logstash
     Loaded: loaded (/lib/systemd/system/logstash.service; disabled; vendor preset: enabled)
     Active: active (running) since Sun 2023-01-29 15:25:57 CST; 7s ago
   Main PID: 60841 (java)
      Tasks: 33 (limit: 18977)
     Memory: 508.6M
     CGroup: /system.slice/logstash.service
             └─60841 /usr/share/logstash/jdk/bin/java -Xms1g -Xmx1g -Djava.awt.headless=true ->

1月 29 15:25:57 liuxgu systemd[1]: Started logstash.
1月 29 15:25:57 liuxgu logstash[60841]: Using bundled JDK: /usr/share/logstash/jdk

上面表明我们的 logstash 服务已经被成功地运行起来了。我们还可以通过如下的命令来查看 logstash 服务的日志:

journalctl -u logstash

向 Kafka topic 写入数据

我们使用如下的 Python 应用向我们的 Kafka topic “registered_user” 来写入数据:

write_to_kafka.py

from faker import Faker
from kafka import KafkaProducer
import json
fake = Faker()
import time

def get_registered_data():
    return 
        'first name': fake.first_name(),
        'last name': fake.last_name(),
        'age': fake.random_int(0, 60),
        'address': fake.address(),
        'register_year': fake.year(),
        'register_month': fake.month(),
        'register_day': fake.day_of_month(),
        'monthly_income': fake.random_int(28000, 100000)
    

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

producer = KafkaProducer(bootstrap_servers=['192.168.0.4:9092'],
                         value_serializer=json_serializer)

if __name__ == '__main__':
    while True:
        registered_data = get_registered_data()
        print(registered_data)
        producer.send('registered_user', registered_data)
        time.sleep(3)

为了运行上面的应用,我们必须安装如下的两个包:

pip3 install Faker
pip3 install kafka-python

我们在 Ubuntu OS 机器上运行上面的代码:

python write_to_kafka.py 

 我们回到 Kibana 的界面来进行查看:

GET _cat/indices

上面的命令显示:

我们可以对这个文件进行搜索:

GET registered_user/_search

 我们可以看到如下的结果:

从上面,我们可以看出来我们的数据已经被结构化。

我们可以针对这个索引进行可视化。你可以阅读我博客里的相应文章以了解更多。

以上是关于Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道

Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道

使用 Apache Kafka 将数据从 MSSQL 同步到 Elasticsearch

Kafka原理和实践:从Log Agent→Kafka→ElasticSearch→Kibana

Kafka 连接器 Elasticsearch topics.regex

如何在 Kafka 连接器中正确连接 Elastic Operator 部署的 Elasticsearch?