即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?

Posted

技术标签:

【中文标题】即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?【英文标题】:How to recover streaming data even if the connection lost with Python client Elasticsearch? 【发布时间】:2020-06-16 07:57:18 【问题描述】:

我从https://www.n2yo.com/api/ 流式传输 RESTful API 数据,用于跟踪卫星位置。 我使用带有 Elasticsearch 的 python 客户端。我每 10 秒将流式数据保存到 ES,并由 Kibana 进行可视化。我的 ES 版本是 6.4.3

我的代码是:

URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"



es = Elasticsearch('http://ip:port',timeout=600)

settings =  "settings": 
                 "number_of_shards":1,
                  'number_of_replicas':0
                 ,
      "mappings" :  
           "document" : 
                "properties":
                    "geo": 
                       "type": "geo_point"
                            
                          
                         
                      
                  
try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 sys.exit(1)

def collect_data():
  data = requests.get(url = URL).json() 
  del data['positions'][1]
  new_data = 'geo':'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude'], 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              

  es.index(index='spacestation', doc_type='document', body=new_data)

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 

我的问题是:昨天我失去了连接。错误如下,

requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): 最大重试次数超出 url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/ 2/&apiKey= (由NewConnectionError(': 无法建立新连接: [Errno -3] 名称解析临时失败',))

当我重新运行我的代码时,我不能,因为索引已经存在。如果我删除索引,我将丢失已经在 ES 中的数据。我可以做什么?我需要保留我保存的数据,并且我需要从现在开始运行该作业。请问有什么解决办法吗?

【问题讨论】:

【参考方案1】:

仅当您从 n2yo.com 收到数据时才创建索引。你应该使用函数es.indices.exists。然后你让你的函数collect_data在失败的情况下递归。试试:

 URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"



es = Elasticsearch('http://ip:port',timeout=600)

def create_index()

    if not es.indices.exists(index = "spacestation"):

        settings =  "settings": 
                 "number_of_shards":1,
                  'number_of_replicas':0
                 ,
      "mappings" :  
           "document" : 
                "properties":
                    "geo": 
                       "type": "geo_point"
                            
                          
                         
                      
                  
          es.indices.create(index = "spacestation", body=settings)
    else:
        print('Index already exists!!')


def collect_data():
  try:
      data = requests.get(url = URL).json()
      create_index() 
      del data['positions'][1]
      new_data = 'geo':'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude'], 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              

      es.index(index='spacestation', doc_type='document', body=new_data)
    except:
        collect_data()

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 

【讨论】:

感谢您的回答,我会努力的。但是你能告诉我“除了”会发生什么吗?我知道“尝试”是将数据插入到我的索引中。 如果函数 collect_dat 因错误而结束,except 语句会捕获其中的任何错误并重新启动相同的函数。调用自身的函数称为递归函数。因此,对于这个用例,函数 collect_data 唯一可能的错误可能是连接错误 - 到 ny2yo 或到 es - 您将重新运行相同的函数,因为您希望连接错误是暂时的,并且不依赖于您的配置,即好的 如果在 collect_data 函数中可能是其他错误 - 例如在这一行:del data['positions'][1] - 你应该替换 except: 与 except ConnectionError: 我按照你的方法试过了,但是当我把“设置”放在“如果不是”下时给了我一个错误。所以我把设置放在函数之后。但是“如果不是”出现错误。 if not es.indices.exists(index = "spacestation"): TabError: 在缩进中不一致使用制表符和空格 缩进应该有 4 个空格,而不是制表符。解释器发现您在代码中使用了 4 个空格和制表符来进行缩进。请仅使用 4 个空格。如果你使用 IDE 作为 Pycharm,你可以很快解决它

以上是关于即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot-spring-data-elasticsearch7.12.0

Elasticsearch:为 Elasticsearch 8.x 引入新的 PHP 客户端

python使用es随机查询

SSL 客户端身份验证 - 即使我的客户端证书与“证书颁发机构”中的列表匹配,也找不到合适的证书

Python Day78 django-Ajax

Elasticsearch之插件介绍及安装