如何使用 elasticsearch.helpers.streaming_bulk

Posted

技术标签:

【中文标题】如何使用 elasticsearch.helpers.streaming_bulk【英文标题】:How to use elasticsearch.helpers.streaming_bulk 【发布时间】:2016-04-12 01:58:38 【问题描述】:

有人可以建议如何使用函数 elasticsearch.helpers.streaming_bulk 而不是 elasticsearch.helpers.bulk 将数据索引到 elasticsearch 中。

如果我只是更改 streaming_bulk 而不是 bulk,则没有任何内容被索引,所以我想它需要以不同的形式使用。

下面的代码以 500 个元素的块从 CSV 文件创建索引、类型和索引数据到 elasticsearch。它工作正常,但我在徘徊是否有可能提高性能。这就是我想尝试 streaming_bulk 功能的原因。

目前我需要 10 分钟来为 200MB 的 CSV 文档索引 100 万行。我使用两台机器,Centos 6.6,8 CPU-s,x86_64,CPU MHz:2499.902,内存:15.574G。 不知道能不能再快一点。

es = elasticsearch.Elasticsearch(['host': 'uxmachine-test', 'port': 9200])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file

es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)

with open(file_to_index, 'rb') as csvfile:
    reader = csv.reader(csvfile)        #read documents for indexing from CSV file, more than million rows
    content = "_index": index_name, "_type": type_name
    batch_chunks = []
    iterator = 0

    for row in reader:
        var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
        id_increment = id_increment + 1
        #var = transform_row_for_indexing(row,fields, index_name, type_name)
        batch_chunks.append(var)
        if iterator % 500 == 0:
            helpers.bulk(es,batch_chunks)
            del batch_chunks[:]
            print "ispucalo batch"
        iterator = iterator + 1
    # indexing of last batch_chunk
    if len(batch_chunks) != 0:
        helpers.bulk(es,batch_chunks)

【问题讨论】:

【参考方案1】:

所以流式批量返回一个交互器。这意味着在您开始迭代之前什么都不会发生。 'bulk' 函数的代码如下所示:

success, failed = 0, 0

# list of errors to be collected is not stats_only
errors = []

for ok, item in streaming_bulk(client, actions, **kwargs):
    # go through request-reponse pairs and detect failures
    if not ok:
        if not stats_only:
            errors.append(item)
        failed += 1
    else:
        success += 1

return success, failed if stats_only else errors

所以基本上只调用 streaming_bulk(client, actions, **kwargs) 实际上不会做任何事情。直到你像在这个 for 循环中那样迭代它,索引才真正开始发生。

所以在你的代码中。欢迎您将 'bulk' 更改为 'streaming_bulk' 但您需要迭代流式处理的结果才能真正将任何内容编入索引。

【讨论】:

【参考方案2】:

streaming_bulk 使用 actions 的迭代器并为每个操作生成响应。 因此,您首先需要在文档上编写一个简单的迭代器,如下所示:

def document_stream(file_to_index):
    with open(file_to_index, "rb") as csvfile:
        for row in csv.reader(csvfile):
            yield "_index": index_name,
                   "_type": type_name,
                   "_source": transform_row(row)
                   

然后进行流式批量插入

stream = document_stream(file_to_index)
for ok, response in streaming_bulk(es, actions = stream):
    if not ok:
        # failure inserting
        print response

【讨论】:

以上是关于如何使用 elasticsearch.helpers.streaming_bulk的主要内容,如果未能解决你的问题,请参考以下文章

如果加入条件,我该如何解决。如果使用字符串连接,我如何使用

如何使用本机反应创建登录以及如何验证会话

如何在自动布局中使用约束标识符以及如何使用标识符更改约束? [迅速]

如何使用 AngularJS 的 ng-model 创建一个数组以及如何使用 jquery 提交?

如何使用laravel保存所有行数据每个行名或相等

如何使用 Math.Net 连接矩阵。如何使用 Math.Net 调用特定的行或列?