python解析ES数据到表

Posted 道法自然

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python解析ES数据到表相关的知识,希望对你有一定的参考价值。

Elasticsearch 通过 Elasticsearch,您能够执行及合并多种类型的搜索(结构化数据、非结构化数据、地理位置、指标),搜索方式随心而变。
参考:
https://github.com/elastic/elasticsearch-py/blob/7.10/docs/sphinx/index.rst
使用的版本:
      "version" : { "number" : "7.10.2"}

es_analy.py               进行单个索引解析数据
es_exec_circle.py      控制每个小时循环执行

es_analy.py 脚本

#!/usr/local/python/bin/python
# coding=utf-8
import sys
import os
import json
import ast
from elasticsearch import Elasticsearch
from operator import itemgetter


#获取并检查外部传入参数
if (len(sys.argv)==5):
    index_name = sys.argv[1]
    bizDate = sys.argv[2]
    create_time_start = sys.argv[3]
    create_time_end = sys.argv[4]
    if len(bizDate) !=8 :
       print( "业务日期传参非法*********")
       sys.exit(1)
else:
    print( "脚本传参错误,python es_analy.py index_name YYYYMMDD \\"开始时间\\" \\"结束时间\\"  请检查!")
    print( "脚本传参错误,python es_analy.py index_data_info \\"2021-04-26 09:00:00\\" \\"2021-04-26 09:59:59\\"  请检查!")
    sys.exit(1)

# 地址链接
es = Elasticsearch(
    [\'xx.xx.xx.xx\'],
    http_auth=(\'username\', \'password\'),
    scheme="http",
    port=9200,
)
print es

# 表字段列表
tab_fld = []
# 表字段字符串
tab_fld_str = \'\'
tab_fld_cloumn = \'\'
# 文件路径
file_path = \'/home/es/tmp/\' + index_name + bizDate + ".txt"
print "文件路径:" + file_path

# 全部数据获取
#res = es.search(index=index_name, body={"query": {"match_all": {}}})
# 带条件查询获取
res = es.search(index=index_name, body={"query": {"bool":{"must": [{"range": {"CREATE_TIME":{"from":create_time_start,"to":create_time_end}}}]}}})
print("Got %d Hits:" % res[\'hits\'][\'total\'][\'value\'])
# 获取的值
hists = res[\'hits\'][\'hits\']

# 获取总的字段值
def table_fld_info():
    global tab_fld_str
    global tab_fld_cloumn
    # 通过解析每一行获取字段值
    #for hit in hists:
    #    hit_dic = hit["_source"]
    #    hit_list = json.dumps(hit_dic)
    #    hit_dic = ast.literal_eval(hit_list)
    #    print hit_dic
    #    hit_dic_key = list(hit_dic.keys())
    #    for i in hit_dic_key:
    #        if i in tab_fld:
    #           pass
    #        else:
    #           tab_fld.append(i)
    #tab_fld_cloumn = [ i for i in tab_fld]
    
    # 通过get_mapping获取字段值
    mapping_values = es.indices.get_mapping(index=index_name)
    mapping_val = json.dumps(mapping_values)
    mapping_dic = ast.literal_eval(mapping_val)
    tab_fld_cloumn = list(mapping_dic[index_name][\'mappings\'][\'properties\'].keys())
    # 加工表字段
    print tab_fld_cloumn
    for fld in tab_fld_cloumn:
        tab_fld_str = tab_fld_str + fld + \' string,\'
    tab_fld_str = tab_fld_str[:-1]
    return tab_fld_cloumn
    
# 解析数据写入临时文件
def table_fld_values():
    # 设置文件名
    fo = open(file_path, "w")
    # 字段列表
    fld_list = table_fld_info()
    fld_list = list(fld_list)
    for hit in hists:
        # 每一列的数据写入文件
        line_data = \'\'
        hit_dic = hit["_source"]
        hit_list = json.dumps(hit_dic,encoding=\'utf-8\',ensure_ascii=False)
        # 字符串转成字典
        hit_dic = ast.literal_eval(hit_list)
        print hit_dic
        for i in fld_list:
            print i + ":  " + hit_dic.get(i,\'\')
            line_data = line_data + \',\' + hit_dic.get(i,\'\')
        line_data = line_data[1:] + "," + bizDate
        print line_data
        fo.write(line_data + "\\n")
    fo.close()

# 创建表
def table_data():
    global tab_fld_str
    global index_name
    # 表名中有\'-\'转换成\'_\'
    index_name = "es." + index_name.replace(\'-\',\'_\')
    #创建正式表,不存在则创建
    sql = "create table if not exists " + index_name + "(" + tab_fld_str + ") partitioned by (  `dt`  varchar(8) ) row format delimited fields terminated by \'\\\\001\' lines terminated by \'\\\\n\' stored as textfile;"
    # impala 执行
    exec_Sql(sql)

    #如果存在临时表,则删除临时表
    sql = "drop table if exists " + index_name + "_temp"
    # impala 执行
    exec_Sql(sql)
    
    #创建临时表,用于加载数据
    sql = "create table if not exists " + index_name + "_temp(" + tab_fld_str + ",dt varchar(8) ) row format delimited fields terminated by \',\' lines terminated by \'\\\\n\' stored as textfile;"
    # impala 执行
    exec_Sql(sql)

    # hive 加载数据
    sql = "load data local inpath \'" + file_path + "\' into table " + index_name + "_temp" 
    # hive 执行
    exec_hive_Sql(sql)
        
    # impala 刷新数据
    sql = "invalidate metadata " + index_name + "_temp;" 
    # impala 执行
    exec_Sql(sql)

    # 临时表数据插入到正式表
    sql = "insert into " + index_name + " partition(dt)  select * from " + index_name + "_temp" 
    # impala 执行
    exec_Sql(sql)
        
if __name__ == "__main__":
   if len(hists) == 0:
      print "该时间段未能解析出来数据,直接成功通过"
      pass
   else:      
      table_fld_values()
      table_data()

 

es_exec_circle.py 脚本

#!/usr/local/python/bin/python
# coding=utf-8
import time
import datetime
import sys
import os

#获取并检查外部传入参数
if (len(sys.argv)==5):
    index_name = sys.argv[1]
    bizDate = sys.argv[2]
    create_time_start = sys.argv[3]
    create_time_end = sys.argv[4]
    if len(bizDate) !=8 :
       print( "业务日期传参非法*********")
       sys.exit(1)
else:
    print( "脚本传参错误,python es_analy.py index_name  YYYYMMDD \\"开始时间\\" \\"结束时间\\"  请检查!")
    print( "脚本传参错误,python es_analy.py index_data_info \\"2021-04-26 09:00:00\\" \\"2021-04-26 09:59:59\\"  请检查!")
    sys.exit(1)

#开始时间
start_time = create_time_start
start_time_array = timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
start_time_Stamp = int(time.mktime(start_time_array))
print "开始时间:" + start_time + " 时间戳" + str(start_time_Stamp)

#结束时间
end_time = create_time_end
end_time_array = timeArray = time.strptime(end_time, "%Y-%m-%d %H:%M:%S")
end_time_Stamp = int(time.mktime(end_time_array))
print "结束时间:" + start_time + " 时间戳:" + str(end_time_Stamp)

#获取开始时间的小时
start_time_h = int(start_time[14:16])
if start_time_h > 29:
   print "开始时间在小时的后半段"
   time_level = start_time[0:14] + "59:59"
   print time_level
else:
   print "开始时间在小时的前半段"
   time_level = start_time[0:14] + "29:59"
   print time_level

print "第一次时间段:" + start_time + "        " + time_level
time_level_array = timeArray = time.strptime(time_level, "%Y-%m-%d %H:%M:%S")
time_level_Stamp = int(time.mktime(time_level_array))
#处理第一次脚本处理
cmd = "python es_analy.py " + index_name +" " + bizDate + " \\"" + start_time + "\\"" + " \\"" + time_level + "\\""
print cmd
val = os.system(cmd)
if val == 0:
   pass
else:
   print "cmd: " + cmd + " 命令执行出错!请检查..."
   sys.exit(1)
   
#每隔半小时进行循环解析
while 1==1: 
   if time_level_Stamp < end_time_Stamp:
      dateArray_start = datetime.datetime.fromtimestamp(time_level_Stamp + 1)
      startTime = dateArray_start.strftime("%Y-%m-%d %H:%M:%S")
      time_level_Stamp = time_level_Stamp + 1800
      dateArray_end = datetime.datetime.fromtimestamp(time_level_Stamp)
      endTime = dateArray_end.strftime("%Y-%m-%d %H:%M:%S")
      print startTime + "    " + endTime
      # 循环处理
      cmd = "python es_source_to_data.py " + index_name +" " + bizDate + " \\"" + startTime + "\\"" + " \\"" + endTime + "\\""
      print cmd
      val = os.system(cmd)
      if val == 0:
         pass
      else:
         print "cmd: " + cmd + " 命令执行出错!请检查..."
         sys.exit(1)
   else:
      break

 

脚本执行示例:

python es_exec_circle.py index_data_info 20210623 "2021-04-26 09:00:00" "2021-04-26 09:59:59"

 

当前目录会生成一个.txt 文件,是解析出来的数据,以\',\'分隔的。

 

以上是关于python解析ES数据到表的主要内容,如果未能解决你的问题,请参考以下文章

ES7-Es8 js代码片段

PLSQL:将数据从 XML 解析并插入到表中

Python中verbaim标签使用详解

oracle plsql:如何解析 XML 并插入到表中

Relay.js 没有正确解析组合片段

python 用于数据探索的Python代码片段(例如,在数据科学项目中)