Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中
Posted Elastic 中国社区官方博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中相关的知识,希望对你有一定的参考价值。
Elasticsearch 是一个强大的 RESTful 搜索和分析引擎,能够处理越来越多的用例。 它将集中存储你的数据,以实现闪电般的快速搜索、微调相关性以及可轻松扩展的强大分析。 关于如何使用 Elastic Stack(又名 ELK 堆栈)将数据摄取到 Elasticsearch 的资源有很多。在今天的文章中,我将详细介绍如何使用 Node.js 从零开始来把地震的实时数据采集到 Elasticsearch 中。
如果你选择的编程语言是 javascript,并且你需要使用 RESTful API 方法从第三方应用程序获取数据,那么使用 Node.js 获取数据是一个不错的选择。 你还可以托管服务器,让它持续实时摄取数据。 该演示将向您展示如何设置一个 Node.js + Express.js 服务器,该服务器实时将数据提取到 Elasticsearch 中,然后可以对这些数据进行分析并以有意义的方式采取行动。
对于此演示,我们将使用 USGS 实时发布的公开可用的全球地震数据。
准备工作
Elasticsearch 及 Kibana
如果你还没有安装好自己的 Elasticsearch 及 Kibana 的话,那么请参考我之前的文章:
在今天的展示中,我将使用 Elastic Stack 8.x 来进行展示。在安装的时候,请参考相应的 Elastic Stack 8.x 的文章来进行安装。
Node.js
你需要安装好自己的 Node.js 来完成下面的练习。你可以参考 Node.js 链接来进行相应的安装。
实时数据
根据 USGS 网上所提供的信息,我们可以在地址 https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson 找到相应的地震信息数据。我们可以通过如下的命令来进行查看:
curl https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson | jq .
如上所示,它是一个以 JSON 格式给出来的数据信息。这个数据会实时发生变化,我们可以通过反复访问这个接口来得到所需要的地震信息。在这里,我们需要注意的是:
- "time": 1672471359610,这是一个时间信息,可以作为我们的 timestamp 来对它进行分析。我们将最终把它存入到 @timestamp 里。
-
"id": "nc73827101",这是一个地震特有的 id,我们将以这个 id 成为数据的 id。
-
"geometry",这个是地震发生的地理位置。我们可以需要在 Elasticsearch 中为它定一下为 geo_point 数据类型。我们将把它变为:
虽然数据有很多,但是我们最终需要的数据格式是这样的:
"mag": 1.13,
"place": "11km ENE of Coachella, CA",
"@timestamp": 2022-05-02T20:07:53.266Z,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
"sig": 20,
"type": "earthquake",
"depth": 2.09,
"coordinates":
"lat": 33.7276667,
"lon": -116.0736667
在接下来的步骤里,我来详细介绍如何达到我们最终的目的。
创建 Node.js 应用
创建最基本的 express 应用
我们将从 0 开始一步一步地创建 Node.js 应用。首先我们在自己的电脑中创建一个目录:
mkdir earthquake_app
$ pwd
/Users/liuxg/demos
$ mkdir earthquake_app
$ cd earthquake_app/
我们进入到该目录中,并打入如下的命令:
npm init -y
$ npm init -y
Wrote to /Users/liuxg/demos/earthquake_app/package.json:
"name": "earthquake_app",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts":
"test": "echo \\"Error: no test specified\\" && exit 1"
,
"keywords": [],
"author": "",
"license": "ISC"
$ ls
package.json
上述命令生成一个叫做 package.json 的文件。在以后安装的 packages,它也会自动添加到这个文件中。默认的设置显然不是我们想要的。我们需要对它做一些修改。
在接下来的代码中,我们将会使用如下的一些 packages:
- @elastic/elasticsearch
- axios
- config
- cors
- express
- log-timestamp
- nodemon
我们可以通过如下的命令来进行安装:
npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon
$ npm i @elastic/elasticsearch axios config cors express log-timestamp nodemon
npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: https://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/
npm notice Beginning October 4, 2021, all connections to the npm registry - including for package installation - must use TLS 1.2 or higher. You are currently using plaintext http to connect. Please visit the GitHub blog for more information: https://github.blog/2021-08-23-npm-registry-deprecating-tls-1-0-tls-1-1/
added 118 packages in 17s
11 packages are looking for funding
run `npm fund` for details
由于我之前已经安装过,所以我上面显示的信息和你的可能会有所不同。我们再次来查看 package.json 文件:
$ pwd
/Users/liuxg/demos/earthquake_app
$ ls
node_modules package-lock.json package.json
$ cat package.json
"name": "earthquake_app",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts":
"test": "echo \\"Error: no test specified\\" && exit 1"
,
"keywords": [],
"author": "",
"license": "ISC",
"dependencies":
"@elastic/elasticsearch": "^8.5.0",
"axios": "^1.2.2",
"config": "^3.3.8",
"cors": "^2.8.5",
"express": "^4.18.2",
"log-timestamp": "^0.3.0",
"nodemon": "^2.0.20"
很显然,我们最新安装的 packages 已经自动添加到 package.json 文件中了。
我们接下来创建一个叫做 server 的子目录,并在它里面创建一个叫做 server.js 的文件:
$ pwd
/Users/liuxg/demos/earthquake_app
$ mkdir server
$ touch server/server.js
在上面,我们创建了一个叫做 server.js 的文件。这个将来就是我们需要运行的 server 脚本。为了能够让我们的 package.json 文件的配置能让 npm 进行运行,我们需要对它进行修改。
$ pwd
/Users/liuxg/demos/earthquake_app
$ cat package.json
"name": "earthquake_app",
"version": "1.0.0",
"description": "",
"main": "sever.js",
"scripts":
"start": "nodemon server/server.js",
"test": "echo \\"Error: no test specified\\" && exit 1"
,
"keywords": [],
"author": "",
"license": "ISC",
"dependencies":
"@elastic/elasticsearch": "^8.5.0",
"axios": "^1.2.2",
"config": "^3.3.8",
"cors": "^2.8.5",
"express": "^4.18.2",
"log-timestamp": "^0.3.0",
"nodemon": "^2.0.20"
很多人可能会奇怪,为啥使用 nodemon 来启动脚本。它的好处是当我们修改好 server.js 里的脚本,那么它会自动重新启动服务器的运行,而不需要我们每次都需要打入如下的命令:
npm start
接下为了验证我们的 express 应用是否能成功地运行,我们修改 server.js 为如下的代码:
server/server.js
onst express = require('express');
const app = express();
const port = 5001;
app.get('/', (req, res) =>
res.send('Hello World!')
)
app.listen(port, () => console.log(`Server listening at http://localhost:$port`));
我们接下来使用如下的命令来进行启动:
npm start
$ pwd
/Users/liuxg/demos/earthquake_app
$ npm start
> earthquake_app@1.0.0 start
> nodemon server/server.js
[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node server/server.js`
Server listening at http://localhost:5001
我们可以看到服务器已经成功地运行起来了,并且它运行于 5001 端口上。我们可以通过浏览器来进行访问它的网址:
上面显示我们的服务器运行正常。
安全地连接 Node.js 服务器到 Elasticsearch
接下来,我们需要创建代码来安全地连接 Node.js 服务到我们本地部署的 Elasticsearch 中。我们可以参考之前的文章 “Elasticsearch:使用最新的 Nodejs client 8.x 来创建索引并搜索”。我们可以在项目的更目录下创建如下的两个子目录:
mkdir config
mkdir -p server/elasticsearch
$ pwd
/Users/liuxg/demos/earthquake_app
$ mkdir config
$ mkdir -p server/elasticsearch
$ ls -d */
config/ node_modules/ server/
在 config 子目录下,我们创建如下的一个叫做 default.json 的文件。这个是用来配置如何连接到 Elasticsearch 的:
config/default.json
"elastic":
"elasticsearch_endpoint": "https://localhost:9200",
"username": "elastic",
"password": "-pK6Yth+mU8O-f+Q*F3i",
"apiKey": "eVBKOFhJVUJUN1gwSDQyLU5halY6R1BVRjNOUmpRYUtkTmpXTUZHdWZVUQ==",
"certificate": "/Users/liuxg/elastic/elasticsearch-8.5.3/config/certs/http_ca.crt",
"caFingerprint": "E3D36275D9FA80CF96F74E6537FC74E7952511A75E01605EBCFB8FC9F08F598C"
我们先不要着急来了解这些配置参数。有些我们可能并不一定要用到。这些设置针对我们每个人的 Elasticsearch 的安装的不同而不同。在上面的参数解释如下:
- elasticsearch_endpoint:这个是 Elasticsearch 的访问地址
- username:这个是访问 Elasticsearch 的用户名,你可以不选用超级用户 elastic,而且在生产环境中,也不是推荐的方法
- password:这个是上面 username 账号的密码
- apiKey:这个是访问 Elasticsearch 所需要的 apiKey。你可以参考 “Elasticsearch:使用最新的 Nodejs client 8.x 来创建索引并搜索” 来了解如何进行生产。在下面的代码中,我们也可以使用 code 来进行生成
- certificate:这个是证书的位置。每个 Elasticsearch 集群都会有一个生成的证书位置。我们需要填入这个位置信息
- caFingerprint:这个是证书的 fingerprint 信息。我们可以采用 fingerprint 来进行连接。在本演示中,我将不使用这种方式。更多信息,请参考 Connecting | Elasticsearch JavaScript Client [master] | Elastic
我们在 elasticsearch 目录下创建一个叫做 client.js 的文件:
server/elasticsearch/client.js
const Client = require('@elastic/elasticsearch');
const config = require('config');
const fs = require('fs')
const elasticConfig = config.get('elastic');
// const client = new Client (
// node: elasticConfig.elasticsearch_endpoint,
// auth:
// apiKey: elasticConfig.apiKey
// ,
// tls:
// ca: fs.readFileSync(elasticConfig.certificate),
// rejectUnauthorized: true
//
// );
const client = new Client (
node: elasticConfig.elasticsearch_endpoint,
auth:
username: elasticConfig.username,
password: elasticConfig.password
,
tls:
ca: fs.readFileSync(elasticConfig.certificate),
rejectUnauthorized: true
);
client.ping()
.then(response => console.log("You are connected to Elasticsearch!"))
.catch(error => console.error("Elasticsearch is not connected."))
module.exports = client;
在上面,我使用了两种方法来连接到 Elasticsearch。一种是通过 username/password 的方式来进行连接:
const client = new Client (
node: elasticConfig.elasticsearch_endpoint,
auth:
username: elasticConfig.username,
password: elasticConfig.password
,
tls:
ca: fs.readFileSync(elasticConfig.certificate),
rejectUnauthorized: true
);
而另外一种就是被注释掉的那个方法:
const client = new Client (
node: elasticConfig.elasticsearch_endpoint,
auth:
apiKey: elasticConfig.apiKey
,
tls:
ca: fs.readFileSync(elasticConfig.certificate),
rejectUnauthorized: true
);
这个也是被推荐的方法。在实际的使用中,我们更推荐使用 API key 来进行连接。
我们首先来使用 username/password 的方式来进行连接。我们需要修改我们的 server.js 来进行验证:
server/server.js
const express = require('express');
const client = require('./elasticsearch/client');
const app = express();
const port = 5001;
app.get('/', (req, res) =>
res.send('Hello World!')
)
app.listen(port, () => console.log(`Server listening at http://localhost:$port`));
我们重新运行服务器。我们可以看到如下的输出:
$ pwd
/Users/liuxg/demos/earthquake_app
$ npm start
> earthquake_app@1.0.0 start
> nodemon server/server.js
[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node server/server.js`
Server listening at http://localhost:5001
You are connected to Elasticsearch!
上面的输出表明我们已经能够成功地连接到 Elasticsearch 了。
使用代码获取 API key
我们接下来可以通过代码来获得 API key,尽管我们可以通过其它的方法来获得。请详细阅读 “Elasticsearch:创建 API key 接口访问 Elasticsearch”。在这里,我们可以使用 Node.js 代码来动态地生成一个 API key。我们在 server 目录下创建如下的一个文件:
sever/create-api-key.js
const client = require('./elasticsearch/client');
async function generateApiKeys(opts)
const body = await client.security.createApiKey(
body:
name: 'earthquake_app',
role_descriptors:
earthquakes_example_writer:
cluster: ['monitor'],
index: [
names: ['earthquakes'],
privileges: ['create_index', 'write', 'read', 'manage'],
,
],
,
,
,
);
return Buffer.from(`$body.id:$body.api_key`).toString('base64');
generateApiKeys()
.then(console.log)
.catch((err) =>
console.error(err);
process.exit(1);
);
我们使用如下的命令来运行这个 Node.js 的代码:
$ pwd
/Users/liuxg/demos/earthquake_app
$ ls server/create-api-key.js
server/create-api-key.js
$ node server/create-api-key.js
You are connected to Elasticsearch!
emZJSGFZVUJUN1gwSDQyLWRLaS06LVpHaXR1bm5RQnEybE4zOWoyd0g5Zw==
我们可以把上面命令生成的 API key 写入到之前的 default.json 文件中。这样我们也可以通过 API key 的方式来访问 Elasticsearch 了,如果我们需要的话。这样 client.js 实际上可以写成:
server/elasticsearch/client.js
const Client = require('@elastic/elasticsearch');
const config = require('config');
const fs = require('fs')
const elasticConfig = config.get('elastic');
const client = new Client (
node: elasticConfig.elasticsearch_endpoint,
auth:
apiKey: elasticConfig.apiKey
,
tls:
ca: fs.readFileSync(elasticConfig.certificate),
rejectUnauthorized: true
);
// const client = new Client (
// node: elasticConfig.elasticsearch_endpoint,
// auth:
// username: elasticConfig.username,
// password: elasticConfig.password
// ,
// tls:
// ca: fs.readFileSync(elasticConfig.certificate),
// rejectUnauthorized: true
//
// );
client.ping()
.then(response => console.log("You are connected to Elasticsearch!"))
.catch(error => console.error("Elasticsearch is not connected."))
module.exports = client;
我们重新运行 server.js,我们可以看到如下的输出:
$ pwd
/Users/liuxg/demos/earthquake_app
$ npm start
> earthquake_app@1.0.0 start
> nodemon server/server.js
[nodemon] 2.0.20
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node server/server.js`
Server listening at http://localhost:5001
You are connected to Elasticsearch!
很显然,我们的 API key 方式是成功的。使用 API key 的好处是我们不必要暴露用户的密码在代码中,而且,我们甚至可以为这个 API key 来设置有效时间及权限。可以授予最小所需要的权限,以确保安全。
设置 RESTful API 调用以从源检索数据
现在我们的服务器正在运行并且 Elasticsearch 已连接,我们需要测试对 USGS 的 API 调用以接收初始数据。 在项目的根目录下,创建一个名为 routes 的文件夹和一个名为 api 的子文件夹。 在 api 文件夹中,创建一个名为 data.js 的文件并添加以下代码:
$ pwd
/Users/liuxg/demos/earthquake_app
$ mkdir -p server/routes/api
我在 routes/api 目录下创建一个如下的 data.js 文件:
server/routes/api/data.js
require('log-timestamp');
const express = require('express');
const router = express.Router();
const axios = require('axios')
const client = require('../../elasticsearch/client');
const URL = `https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson`;
router.get('/earthquakes', async function (req, res)
console.log('Loading Application...');
//======= Check that Elasticsearch is up and running =======\\\\
pingElasticsearch = async () =>
await client.ping(
function(error,res)
if (error)
console.error('elasticsearch cluster is down!');
else
console.log('Elasticsearch Ready');
);
// ====== Get Data From USGS and then index into Elasticsearch
indexAllDocs = async () =>
try
console.log('Getting Data From Host')
const EARTHQUAKES = await axios.get(`$URL`,
headers:
'Content-Type': [
'application/json',
'charset=utf-8'
]
);
console.log('Data Received!')
results = EARTHQUAKES.data.features
console.log('Indexing Data...')
console.log(results)
res.json(results)
if (EARTHQUAKES.data.length)
indexAllDocs();
else
console.log('All Data Has Been Indexed!');
;
catch (err)
console.log(err)
;
console.log('Preparing For The Next Data Check...');
console.log("Ping the Elasticsearch server");
pingElasticsearch()
console.log("Get data from USGS");
indexAllDocs()
);
module.exports = router;
上面的代码使用 npm 包 Axios 对 USGS 地震 API 进行异步 API 调用。 收到数据后,它将显示为 JSON。 你还可以看到我们在页面顶部导入了一个名为 log-timestamp 的依赖项。 这将允许我们将时间戳添加到每个 console.log。
我们接下来修改 server.js 如下:
server/server.js
const express = require('express');
const client = require('./elasticsearch/client');
const app = express();
const port = 5001;
//Define Routes
const data = require('./routes/api/data')
app.use('/api/data', data);
app.get('/', (req, res) =>
res.send('Hello World!')
)
app.listen(port, () => console.log(`Server listening at http://localhost:$port`));
重新运行我们的 server.js。我们通过 Postman 或者其它的工具来对我们的 REST 接口进行访问:
localhost:5000/api/data/earthquakes
从上面的输出中,我们可以看出来设计的 REST 接口工作是正常的。它含有一些收集来的数据。在所收集来的数据中,有一些数据是我们并不需要的。我们最终需要的数据是这样的:
"mag": 1.13,
"place": "11km ENE of Coachella, CA",
"time": 2022-05-02T20:07:53.266Z,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40240408",
"sig": 20,
"type": "earthquake",
"depth": 2.09,
"coordinates":
"lat": 33.7276667,
"lon": -116.0736667
也就是说我们可以删除一下不需要的字段,并且我们需要转换一些字段,比如把 time 字段转换为我们想要的 @timestamp 字段。另外在写入 Elasticsearch 时,我们需要预先针对 coodinates 字段进行定义。它是一个 geo_point 类型的字段。
定义 mapping 及 pipeline
如上所示,我们需要的字段如上。我们可以如下的一个 earthquakes 索引。我们在 Kibana 的 console 中打入如下的命令:
PUT earthquakes
"mappings":
"properties":
"@timestamp":
"type": "date"
,
"coordinates":
"type": "geo_point"
,
"depth":
"type": "float"
,
"mag":
"type": "float"
,
"place":
"type": "text",
"fields":
"keyword":
"type": "keyword"
,
"sig":
"type": "short"
,
"type":
"type": "keyword"
,
"url":
"enabled": false
在上面,我们针对索引的字段类型做如下的说明:
- @timestamp:这是一个 date 字段类型的字段。我们希望的格式是 2022-05-02T20:07:53.266Z 而不是以 EPOC 形式显示的值,比如 1672471359610。这个字段有 time 转换而来
- coordinates:这个是一个 geo_point 的字段。是地震发生的地理位置
- place:这是一个 multi-field 字段。我们希望对这个字段进行统计,也可以针对它进行搜索
- sig:这字段我们使用 short 类型,而不是 long。这样可以省去存储空间
- type:这是一个 keyword 类型的字段。它只可以做数据分析统计之用
- url:这个字段,我们既不想对它进行搜索,也不想对它进行统计,所有设置 enabled 为 false。这样可以省去分词的时间,从而提高摄入数据的速度
为此,我们可以针对上面的 data.js 做更进一步的修改:
server/routes/api/data.js
const express = require('express');
const router = express.Router();
const axios = require('axios')
const client = require('../../elasticsearch/client');
const URL = `https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson`;
//======= Check that Elasticsearch is up and running =======\\\\
function pingElasticsearch()
console.log("ping .....")
client.ping(
requestTimeout: 30000,
, function(error,res)
if (error)
console.error('elasticsearch cluster is down!');
else
console.log('Elasticsearch Ready');
);
;
// ====== Get Data From USGS and then index into Elasticsearch
indexAllDocs = async () =>
try
const EARTHQUAKES = await axios.get(`$URL`,
headers:
'Content-Type': [
'application/json',
'charset=utf-8'
]
);
console.log('Getting Data From Host')
results = EARTHQUAKES.data.features
results.map(
async (results) => (
(earthquakeObject =
place: results.properties.place, //
time: results.properties.time, //
url: results.properties.url, //
sig: results.properties.sig, //
mag: results.properties.mag, //
type: results.properties.type, //
longitude: results.geometry.coordinates[0], //
latitude: results.geometry.coordinates[1], //
depth: results.geometry.coordinates[2], //
),
await client.index(
index: 'earthquakes',
id: results.id,
body: earthquakeObject
)
)
);
if (EARTHQUAKES.data.length)
indexAllDocs();
else
console.log('All Data Has Been Indexed!');
;
catch (err)
console.log(err)
;
console.log('Preparing For The Next Data Check...');
//================== Official API Call ==================\\\\
router.get('/earthquakes', function (req, res)
res.send('Running Application...');
console.log('Loading Application...')
indexAllDocs(res);
);
module.exports = router;
在上面,我们添加了把文档写入 Elasticsearch 的代码部分。我们使用地震数据的 id 作为 Elasticsearch 文档的 id。等服务器运行起来后,我们需要在 terminal 中打入如下的命令:
curl -XGET http://localhost:5001/api/data/earthquakes
我们可以在 Kibana 中通过如下的命令来查看文档:
GET earthquakes/_search?filter_path=**.hits
我们可以看到如下的结果:
"hits":
"hits": [
"_index": "earthquakes",
"_id": "nc73827281",
"_score": 1,
"_source":
"place": "10km S of Laytonville, CA",
"time": 1672505649740,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc73827281",
"sig": 63,
"mag": 2.02,
"type": "earthquake",
"longitude": -123.4981689,
"latitude": 39.5991669,
"depth": 4.59
,
...
很显然,这个文档的 source 和我们之前的想要的格式还是不太一样。为了能够使的 time 转换为 @timestamp,我们可以在 Node.js 的代码中进行相应的转换。我们也可以采用 ingest pipeline 来实现相应的操作。我们定义如下的 ingest pipeine。
POST _ingest/pipeline/_simulate
"pipeline":
"description": "This is for data transform for earthquake data",
"processors": [
"date":
"field": "time",
"formats": [
"UNIX_MS"
]
]
,
"docs": [
"_source":
"place": "16km N of Borrego Springs, CA",
"time": 1672507053210,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
"sig": 10,
"mag": 0.81,
"type": "earthquake",
"longitude": -116.368,
"latitude": 33.4013333,
"depth": 2.91
]
在上面的命令中,我们使用 date processor 来把 time 转换为所需要的格式,并在默认的情况下把 target 设置为 @timestamp。上面命令运行的结果为:
"docs": [
"doc":
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source":
"sig": 10,
"mag": 0.81,
"depth": 2.91,
"@timestamp": "2022-12-31T17:17:33.210Z",
"latitude": 33.4013333,
"place": "16km N of Borrego Springs, CA",
"time": 1672507053210,
"type": "earthquake",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
"longitude": -116.368
,
"_ingest":
"timestamp": "2023-01-01T00:31:03.544821Z"
]
从上面的输出中,我们可以看出来 @timestamp 字段已经生成。它的值由 time 字段转换而来。我们还发现 latitude 及 longitude 并不是按照我们需要的格式来显示的。我们需要把它转化为另外一个像如下的对象:
我们可以通过 rename processor 来操作:
POST _ingest/pipeline/_simulate
"pipeline":
"description": "This is for data transform for earthquake data",
"processors": [
"date":
"field": "time",
"formats": [
"UNIX_MS"
]
,
"rename":
"field": "latitude",
"target_field": "coordinates.lat"
,
"rename":
"field": "longitude",
"target_field": "coordinates.lon"
]
,
"docs": [
"_source":
"place": "16km N of Borrego Springs, CA",
"time": 1672507053210,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
"sig": 10,
"mag": 0.81,
"type": "earthquake",
"longitude": -116.368,
"latitude": 33.4013333,
"depth": 2.91
]
在上面的命令中,我们通过 rename processor 来重新命名 longitude 及 latitude 两个字段。运行上面的代码,我们可以看到如下的结果:
"docs": [
"doc":
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source":
"sig": 10,
"mag": 0.81,
"depth": 2.91,
"@timestamp": "2022-12-31T17:17:33.210Z",
"coordinates":
"lon": -116.368,
"lat": 33.4013333
,
"place": "16km N of Borrego Springs, CA",
"time": 1672507053210,
"type": "earthquake",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
,
"_ingest":
"timestamp": "2023-01-01T00:38:42.729604Z"
]
很显然,我们看到了一个新的 coordinates 的字段。它是一个 object。我们发现有一个多余的字段叫做 time。这个并不是我们所需要的。我们可以通过 remove processor 来删除这个字段。
POST _ingest/pipeline/_simulate
"pipeline":
"description": "This is for data transform for earthquake data",
"processors": [
"date":
"field": "time",
"formats": [
"UNIX_MS"
]
,
"rename":
"field": "latitude",
"target_field": "coordinates.lat"
,
"rename":
"field": "longitude",
"target_field": "coordinates.lon"
,
"remove":
"field": "time"
]
,
"docs": [
"_source":
"place": "16km N of Borrego Springs, CA",
"time": 1672507053210,
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271",
"sig": 10,
"mag": 0.81,
"type": "earthquake",
"longitude": -116.368,
"latitude": 33.4013333,
"depth": 2.91
]
我们运行上面的命令。我们再次查看输出的结果:
"docs": [
"doc":
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source":
"sig": 10,
"mag": 0.81,
"depth": 2.91,
"@timestamp": "2022-12-31T17:17:33.210Z",
"coordinates":
"lon": -116.368,
"lat": 33.4013333
,
"place": "16km N of Borrego Springs, CA",
"type": "earthquake",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ci40152271"
,
"_ingest":
"timestamp": "2023-01-01T00:44:46.919265Z"
]
很显然这个时候,我们的 time 字段不见了。
在上面,我们通过 _simulate 的端点测试好了我们的 ingest pipeline。接下来,是我们使用命令来创建这个 pipeline 的时候了。我们使用如下的命令来创建这个 pipeline:
PUT _ingest/pipeline/earthquake_data_pipeline
"description": "This is for data transform for earthquake data",
"processors": [
"date":
"field": "time",
"formats": [
"UNIX_MS"
]
,
"rename":
"field": "latitude",
"target_field": "coordinates.lat"
,
"rename":
"field": "longitude",
"target_field": "coordinates.lon"
,
"remove":
"field": "time"
]
运行上面的命令。这样我们就创建了一个叫做 earthquake_data_pipeline 的 ingest pipeline。
接下来,我们需要删除之前所创建的索引,因为它包含我们不需要的一些字段:
DELETE earthquakes
我们再次运行之前创建索引 earthquakes 的命令:
PUT earthquakes
"mappings":
"properties":
"@timestamp":
"type": "date"
,
"coordinates":
"type": "geo_point"
,
"depth":
"type": "float"
,
"mag":
"type": "float"
,
"place":
"type": "text",
"fields":
"keyword":
"type": "keyword"
,
"sig":
"type": "short"
,
"type":
"type": "keyword"
,
"url":
"enabled": false
我们接下来需要修改 data.js 文件来使用这个 ingest pipeline:
server/routes/api/data.js
const express = require('express');
const router = express.Router();
const axios = require('axios')
const client = require('../../elasticsearch/client');
const URL = `https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson`;
//======= Check that Elasticsearch is up and running =======\\\\
function pingElasticsearch()
console.log("ping .....")
client.ping(
requestTimeout: 30000,
, function(error,res)
if (error)
console.error('elasticsearch cluster is down!');
else
console.log('Elasticsearch Ready');
);
;
// ====== Get Data From USGS and then index into Elasticsearch
indexAllDocs = async () =>
try
const EARTHQUAKES = await axios.get(`$URL`,
headers:
'Content-Type': [
'application/json',
'charset=utf-8'
]
);
console.log('Getting Data From Host')
results = EARTHQUAKES.data.features
results.map(
async (results) => (
(earthquakeObject =
place: results.properties.place,
time: results.properties.time,
url: results.properties.url,
sig: results.properties.sig,
mag: results.properties.mag,
type: results.properties.type,
longitude: results.geometry.coordinates[0],
latitude: results.geometry.coordinates[1],
depth: results.geometry.coordinates[2],
),
await client.index(
index: 'earthquakes',
id: results.id,
body: earthquakeObject,
pipeline: 'earthquake_data_pipeline'
)
)
);
if (EARTHQUAKES.data.length)
indexAllDocs();
else
console.log('All Data Has Been Indexed!');
;
catch (err)
console.log(err)
;
console.log('Preparing For The Next Data Check...');
//================== Official API Call ==================\\\\
router.get('/earthquakes', function (req, res)
res.send('Running Application...');
console.log('Loading Application...')
setInterval(() =>
pingElasticsearch()
indexAllDocs(res);
, 120000);
);
module.exports = router;
在上面的代码中,我对一下的两处做了修改:
我们再次使用如下的命令来启动对数据的采集:
curl -XGET http://localhost:5001/api/data/earthquakes
稍等一点时间(超过2分钟),我们到 Kibana 中来查看数据:
GET earthquakes/_search
我们可以看到如下的数据:
"took": 0,
"timed_out": false,
"_shards":
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
,
"hits":
"total":
"value": 9,
"relation": "eq"
,
"max_score": 1,
"hits": [
"_index": "earthquakes",
"_id": "us7000j1cr",
"_score": 1,
"_source":
"sig": 340,
"mag": 4.7,
"depth": 181.449,
"@timestamp": "2023-01-01T06:39:45.239Z",
"coordinates":
"lon": 70.8869,
"lat": 36.5351
,
"place": "36 km S of Jurm, Afghanistan",
"type": "earthquake",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/us7000j1cr"
,
...
从上面,我们可以看出来有9个地震数据已经被写入。我们可以让应用运行一段时间,它可能会有更多的数据进来。比如:
"took": 0,
"timed_out": false,
"_shards":
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
,
"hits":
"total":
"value": 10,
"relation": "eq"
,
"max_score": 1,
"hits": [
"_index": "earthquakes",
"_id": "nc73827436",
"_score": 1,
"_source":
"sig": 252,
"mag": 4.04,
"depth": 4.51,
"@timestamp": "2023-01-01T06:49:08.930Z",
"coordinates":
"lon": -121.220665,
"lat": 36.5789986
,
"place": "9km NW of Pinnacles, CA",
"type": "earthquake",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/nc73827436"
,
我们可以看到10个数据。
从上面的数据中,我们可以看到最终的数据结构就是我们想要的数据结构。
在接下来的文章中,我将详细描述如何对这个数据进行可视化。我将使用 Kibana 来进行展示,也会使用 Web 来进行搜索。请参阅文章 “Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中(二)”
为了方便大家的学习,我把源代码放在这里:https://github.com/liu-xiao-guo/earthquakes-app
参考:
以上是关于Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中的主要内容,如果未能解决你的问题,请参考以下文章
学习用Node.js和Elasticsearch构建搜索引擎
学习用Node.js和Elasticsearch构建搜索引擎
Elasticsearch:使用 Filebeat 从 Node.js Web 应用程序提取日志
Elasticsearch:使用 Filebeat 从 Node.js Web 应用程序提取日志