Elasticsearch 部分批量更新

Posted

技术标签:

【中文标题】Elasticsearch 部分批量更新【英文标题】:Elasticsearch partial bulk update 【发布时间】:2018-05-25 08:59:46 【问题描述】:

我在 ElasticSearch 中有 6k 的数据要更新。我必须使用php。 我在文档中搜索并找到了这个,Bulk Indexing,但这并没有保留以前的数据。

我有结构:

[
  
    'name': 'Jonatahn',
    'age' : 21
  
]

我的代码 sn-p 更新:

$params =[
    "index" => "customer",
    "type" => "doc",
    "body" => [
        [
            "index" => [
                "_index" => "customer",
                "_type" => "doc",
                "_id" => "09310451939"
            ]
        ],
        [
            "name" => "Jonathan"
        ]
    ]
];

$client->bulk($params);

当我发送['name' => 'Jonathan'] 时,我希望name 会更新并保留age,但age 会被删除。 当然,我仍然可以逐个更新数据,但这需要很长时间,有没有更好的方法来做到这一点?

【问题讨论】:

【参考方案1】:

$batch_elastics 是结果数组 我每次都从行中取消设置这两个值.... 因为我在插入或更新中不需要这个值

unset($batch_row['type']);

unset($batch_row['diamonds_id']);

代码从这里开始...

    if(count($batch_elastics))
        // echo 'hi';die;
        $params = array();                
        $params = ['body' => []]; 
        $i=1;       
        foreach($batch_elastics as $batch_row)
            $type=$batch_row['type'];
            $id=$batch_row['diamonds_id'];
            unset($batch_row['type']);
            unset($batch_row['diamonds_id']); 
            if($type=="create")                                    
                $params["body"][]= [
                        "create" => [
                            "_index" => 'diamonds',                                                        
                            "_id" => $id,
                        ]
                    ];        
                    $params["body"][]= $batch_row;                             
                if ($i % 1000 == 0) 
                    $responses = $client->bulk($params);                                
                    $params = ['body' => []];                                
                    unset($responses);
                
             
            $i=$i+1;
        
        
        // Send the last batch if it exists
        if (!empty($params['body'])) 
            $responses = $client->bulk($params);
        
        $params = array();                
        $params = ['body' => []]; 
        $i=1; 
        foreach($batch_elastics as $batch_row)
            $type=$batch_row['type'];
            $id=$batch_row['diamonds_id'];
            unset($batch_row['type']);
            unset($batch_row['diamonds_id']); 
            if($type=="update")                                    
                $params["body"][]= [
                        "update" => [
                            "_index" => 'diamonds',                                                        
                            "_id" => $id,
                        ]
                    ];        
                $params["body"][]= [
                    "doc"=>$batch_row
                ];                           
                if ($i % 1000 == 0) 
                    $responses = $client->bulk($params);                                
                    $params = ['body' => []];                                
                    unset($responses);
                
             
            $i=$i+1;
        
        
        // Send the last batch if it exists
        if (!empty($params['body'])) 
            $responses = $client->bulk($params);
        
    

【讨论】:

【参考方案2】:

这是我的最终代码。

<?php

require_once('../elasticsearch.php');

//initialize elasticsearch
$params = array();

$params['index'] = $elastcsearch_index;
$params['type']  = $elastcsearch_type;

///////////////////////////////////////////////////
//update seeders n leechers in elasticsearch 

//get updated records
$get_updated_records = mysqli_query($conn, "SELECT content_id, seeders, leechers FROM content WHERE is_updated = '1' order by seeders DESC") ;

//create blank array
$results = array();

while($row = mysqli_fetch_assoc($get_updated_records))
    //put all results in array
    $results[] = $row;

   

//from https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/_indexing_documents.html

$params = ['body' => []];

for($i = 0; $i < count($results); $i++) 

    $params["body"][]= [
            "update" => [
                "_index" => $elastcsearch_index,
                "_type" => $elastcsearch_type,
                "_id" => $results[$i]['content_id']
            ]
        ];

    $params["body"][]= [
            "doc" => [
                "seeders" => intval($results[$i]['seeders']) ,
                "leechers" => intval($results[$i]['leechers']) ,
            ]
        ];

    // Every 1000 documents stop and send the bulk request
     if ($i % 1000 == 0) 
        $responses = $elasticsearch->bulk($params);

        // erase the old bulk request
        $params = ['body' => []];

        // unset the bulk response when you are done to save memory
        unset($responses);
     


// Send the last batch if it exists
if (!empty($params['body'])) 
    $responses = $elasticsearch->bulk($params);

【讨论】:

实际上_type 现在已被弃用。您不再需要添加它。将使用默认文档类型【参考方案3】:

根据docs,批量 API 可能的操作是索引、创建、删除和updateupdate 期望在下一行指定部分 doc、upsert 和 script 及其选项。

POST _bulk
 "update" : "_id" : "1", "_type" : "_doc", "_index" : "test" 
 "doc" : "field2" : "value2" 

【讨论】:

如何使用 PHP 做到这一点?我使用了上面答案的代码,并不是所有的文件都在更新,有什么线索吗?谢谢,这是我的问题github.com/elastic/elasticsearch-php/issues/785【参考方案4】:

我的错误是使用"index",但正确的做法是"update"

最终代码为:

$params =[
"index" => "customer",
"type" => "doc",
"body" => [
    [
        "update" => [
    //   ^^^^^^ Here I change from index to update
            "_index" => "customer",
            "_type" => "doc",
            "_id" => "09310451939"
        ]
    ],
    [
        "doc" => [
            "name" => "Jonathan"
        ]
    ]
]
];

$client->bulk($params);

使用上面的代码,我的数据保留了以前的数据,只是更新了我在参数中传递的数据。

回复:

Array
(
    [took] => 7
    [timed_out] =>
    [_shards] => Array
        (
            [total] => 5
            [successful] => 5
            [skipped] => 0
            [failed] => 0
        )

    [hits] => Array
        (
            [total] => 1
            [max_score] => 1
            [hits] => Array
                (
                    [0] => Array
                        (
                            [_index] => customer
                            [_type] => doc
                            [_id] => 09310451939
                            [_score] => 1
                            [_source] => Array
                                (
                                    [name] => Jonathan
                                    [age] => 23
                                )

                        )

                )

        )

)

【讨论】:

以上是关于Elasticsearch 部分批量更新的主要内容,如果未能解决你的问题,请参考以下文章

Elastic 许可更新

Elastic 许可更新

springboo整合elasticSearch8 java client api

springboo整合elasticSearch8 java client api

ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解

elastic分页查询scroll