Elasticsearch:使用 Python 进行 Bulk insert 及 Scan

Posted 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:使用 Python 进行 Bulk insert 及 Scan相关的知识,希望对你有一定的参考价值。

在本文章中,我将展示如何使用 Python 来对索引进行 Bulk 写入。在单个 Bulk API 调用中执行多个索引或删除操作。 这减少了开销并且可以大大提高索引速度。

在今天的展示中,我将使用 Jupyter 来进行演示。如果你对 Jupyter 还不是很熟的话,请参阅我之前的文章 “Elasticsearch:使用 Jupyter Notebook 创建 Python 应用导入 CSV 文件”。

首先,我们来创建一个叫做 bulk 的 notebook。

jupyter notebook

我们输入如下的代码。它创建一个连接到 Elasticsearch 的实例:

from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import time
es = Elasticsearch('localhost:9200')

我们接着输入如下的代码:

'''
Bulk inserts snippet with time calculation to insert.
'''
# source code: https://stackoverflow.com/questions/20288770/how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python
actions = [
  {
    "_index": "index8",
    "_type": "_doc",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 2000)
]

上面的代码生成一个数组。数组中的项如下:

[{'_index': 'index8',
  '_type': '_doc',
  '_id': 0,
  '_source': {'any': 'data0',
   'timestamp': datetime.datetime(2021, 9, 8, 9, 32, 39, 247186)}},
  ...

 

如上所示,我们创建了2千个数据。在使用上面的时候,我们必须注意到的是,在 action 的定义中,我们可以定义 _op_type 字段。我们可以参考链接 https://elasticsearch-py.readthedocs.io/en/master/helpers.html。我们可以像如下的方式这样添加 _op_type 字段以实现不同的操作:

{
    '_op_type': 'delete',
    '_index': 'index-name',
    '_id': 42,
}
{
    '_op_type': 'update',
    '_index': 'index-name',
    '_id': 42,
    'doc': {'question': 'The life, universe and everything.'}
}

在默认的情况下,它的操作是 index。接下来,我们来计算一下批量导入这些数据所需要的时间到底是多少。

我们打入如下的代码:

st = time.time()
helpers.bulk(es, actions)
end = time.time()
print("total time:", end-st)

 上面显示整个写入2千个数据所需的时间是 0.52 秒左右。我们可以看到是非常地高效。

接下来,我们使用另外一种方法来导入数据。我们通过每次写入一个数据的方法来进行写入。这样需要请求1万次才可以把1万个数据写入:

'''
Insertion iteratively
'''
st = time.time()
for j in range(0,2000):
    doc = {
        "any":"data" + str(j),
        "timestamp": datetime.now()
    }
    
    es.index(index="test8", doc_type="_doc", id=j, body=doc)
end = time.time()
print("total time", end-st)

在 Kibana 中,我们发现 test8 索引的 count 是一直在增加,但是写入的速度是非常慢的:

等写完 2000 个数据,我们再看一下总的写入时间:

 从上面,我们可以看出来总共需要 193 秒的时间。这个差距是非常之大的。

接下来,我们使用 helper 中的 scan 来获取大量的数据:

'''
Scanning over large data
'''
#source code: https://stackoverflow.com/questions/47722238/python-elasticsearch-helpers-scan-example
results = helpers.scan(es, index="chapter8",doc_type="doc", query={"query": {"match_all": {}}})
for item in results:
    print(item['_id'], item['_source'])

我们发现这个 scan 的速度是非常之快的。几乎在咋眼之间就完成了。 

以上是关于Elasticsearch:使用 Python 进行 Bulk insert 及 Scan的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch简单使用

linux安装elasticsearch

python使用elasticsearch模块操作elasticsearch

C# 如何使用 Elasticsearch (ES)

Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询

Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询