即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?
Posted
技术标签:
【中文标题】即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?【英文标题】:How to recover streaming data even if the connection lost with Python client Elasticsearch? 【发布时间】:2020-06-16 07:57:18 【问题描述】:我从https://www.n2yo.com/api/ 流式传输 RESTful API 数据,用于跟踪卫星位置。 我使用带有 Elasticsearch 的 python 客户端。我每 10 秒将流式数据保存到 ES,并由 Kibana 进行可视化。我的 ES 版本是 6.4.3
我的代码是:
URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)
settings = "settings":
"number_of_shards":1,
'number_of_replicas':0
,
"mappings" :
"document" :
"properties":
"geo":
"type": "geo_point"
try:
es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
print('Index already exists!!')
sys.exit(1)
def collect_data():
data = requests.get(url = URL).json()
del data['positions'][1]
new_data = 'geo':'lat':data['positions'][0]['satlatitude'],
'lon':data['positions'][0]['satlongitude'],
'satname': data['info']['satname'], 'satid': data['info']['satid'],
'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()
es.index(index='spacestation', doc_type='document', body=new_data)
schedule.every(10).seconds.do(collect_data)
while True:
schedule.run_pending()
time.sleep(1)
我的问题是:昨天我失去了连接。错误如下,
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): 最大重试次数超出 url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/ 2/&apiKey= (由NewConnectionError(': 无法建立新连接: [Errno -3] 名称解析临时失败',))
当我重新运行我的代码时,我不能,因为索引已经存在。如果我删除索引,我将丢失已经在 ES 中的数据。我可以做什么?我需要保留我保存的数据,并且我需要从现在开始运行该作业。请问有什么解决办法吗?
【问题讨论】:
【参考方案1】:仅当您从 n2yo.com 收到数据时才创建索引。你应该使用函数es.indices.exists
。然后你让你的函数collect_data
在失败的情况下递归。试试:
URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)
def create_index()
if not es.indices.exists(index = "spacestation"):
settings = "settings":
"number_of_shards":1,
'number_of_replicas':0
,
"mappings" :
"document" :
"properties":
"geo":
"type": "geo_point"
es.indices.create(index = "spacestation", body=settings)
else:
print('Index already exists!!')
def collect_data():
try:
data = requests.get(url = URL).json()
create_index()
del data['positions'][1]
new_data = 'geo':'lat':data['positions'][0]['satlatitude'],
'lon':data['positions'][0]['satlongitude'],
'satname': data['info']['satname'], 'satid': data['info']['satid'],
'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()
es.index(index='spacestation', doc_type='document', body=new_data)
except:
collect_data()
schedule.every(10).seconds.do(collect_data)
while True:
schedule.run_pending()
time.sleep(1)
【讨论】:
感谢您的回答,我会努力的。但是你能告诉我“除了”会发生什么吗?我知道“尝试”是将数据插入到我的索引中。 如果函数 collect_dat 因错误而结束,except 语句会捕获其中的任何错误并重新启动相同的函数。调用自身的函数称为递归函数。因此,对于这个用例,函数 collect_data 唯一可能的错误可能是连接错误 - 到 ny2yo 或到 es - 您将重新运行相同的函数,因为您希望连接错误是暂时的,并且不依赖于您的配置,即好的 如果在 collect_data 函数中可能是其他错误 - 例如在这一行:del data['positions'][1] - 你应该替换 except: 与 except ConnectionError: 我按照你的方法试过了,但是当我把“设置”放在“如果不是”下时给了我一个错误。所以我把设置放在函数之后。但是“如果不是”出现错误。 if not es.indices.exists(index = "spacestation"): TabError: 在缩进中不一致使用制表符和空格 缩进应该有 4 个空格,而不是制表符。解释器发现您在代码中使用了 4 个空格和制表符来进行缩进。请仅使用 4 个空格。如果你使用 IDE 作为 Pycharm,你可以很快解决它以上是关于即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot-spring-data-elasticsearch7.12.0
Elasticsearch:为 Elasticsearch 8.x 引入新的 PHP 客户端