Elasticsearch:使用 Python 进行 Bulk insert 及 Scan
Posted 中国社区官方博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:使用 Python 进行 Bulk insert 及 Scan相关的知识,希望对你有一定的参考价值。
在本文章中,我将展示如何使用 Python 来对索引进行 Bulk 写入。在单个 Bulk API 调用中执行多个索引或删除操作。 这减少了开销并且可以大大提高索引速度。
在今天的展示中,我将使用 Jupyter 来进行演示。如果你对 Jupyter 还不是很熟的话,请参阅我之前的文章 “Elasticsearch:使用 Jupyter Notebook 创建 Python 应用导入 CSV 文件”。
首先,我们来创建一个叫做 bulk 的 notebook。
jupyter notebook
我们输入如下的代码。它创建一个连接到 Elasticsearch 的实例:
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import time
es = Elasticsearch('localhost:9200')
我们接着输入如下的代码:
'''
Bulk inserts snippet with time calculation to insert.
'''
# source code: https://stackoverflow.com/questions/20288770/how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python
actions = [
{
"_index": "index8",
"_type": "_doc",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()}
}
for j in range(0, 2000)
]
上面的代码生成一个数组。数组中的项如下:
[{'_index': 'index8',
'_type': '_doc',
'_id': 0,
'_source': {'any': 'data0',
'timestamp': datetime.datetime(2021, 9, 8, 9, 32, 39, 247186)}},
...
如上所示,我们创建了2千个数据。在使用上面的时候,我们必须注意到的是,在 action 的定义中,我们可以定义 _op_type 字段。我们可以参考链接 https://elasticsearch-py.readthedocs.io/en/master/helpers.html。我们可以像如下的方式这样添加 _op_type 字段以实现不同的操作:
{
'_op_type': 'delete',
'_index': 'index-name',
'_id': 42,
}
{
'_op_type': 'update',
'_index': 'index-name',
'_id': 42,
'doc': {'question': 'The life, universe and everything.'}
}
在默认的情况下,它的操作是 index。接下来,我们来计算一下批量导入这些数据所需要的时间到底是多少。
我们打入如下的代码:
st = time.time()
helpers.bulk(es, actions)
end = time.time()
print("total time:", end-st)
上面显示整个写入2千个数据所需的时间是 0.52 秒左右。我们可以看到是非常地高效。
接下来,我们使用另外一种方法来导入数据。我们通过每次写入一个数据的方法来进行写入。这样需要请求1万次才可以把1万个数据写入:
'''
Insertion iteratively
'''
st = time.time()
for j in range(0,2000):
doc = {
"any":"data" + str(j),
"timestamp": datetime.now()
}
es.index(index="test8", doc_type="_doc", id=j, body=doc)
end = time.time()
print("total time", end-st)
在 Kibana 中,我们发现 test8 索引的 count 是一直在增加,但是写入的速度是非常慢的:
等写完 2000 个数据,我们再看一下总的写入时间:
从上面,我们可以看出来总共需要 193 秒的时间。这个差距是非常之大的。
接下来,我们使用 helper 中的 scan 来获取大量的数据:
'''
Scanning over large data
'''
#source code: https://stackoverflow.com/questions/47722238/python-elasticsearch-helpers-scan-example
results = helpers.scan(es, index="chapter8",doc_type="doc", query={"query": {"match_all": {}}})
for item in results:
print(item['_id'], item['_source'])
我们发现这个 scan 的速度是非常之快的。几乎在咋眼之间就完成了。
以上是关于Elasticsearch:使用 Python 进行 Bulk insert 及 Scan的主要内容,如果未能解决你的问题,请参考以下文章
python使用elasticsearch模块操作elasticsearch
Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询
Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询