ElasticSearch导入txt文本或者json文本

Posted ttzsqwq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch导入txt文本或者json文本相关的知识,希望对你有一定的参考价值。

前段时间做的东西,闲下来做一下整理记录。


 业务:将数据从本地恢复到ES上,本地文件较大,解压后数据量在10个G左右的数据。


 逻辑处理:针对业务需求,共尝试过三次实践。

  一、使用bulk:ES本地支持的批量导入方式,推荐文本大小在10-15M左右,文件的上限应该是不能超过200M(不确定)。

  二、使用logstash:ES官方的另一个产品,将数据文本转换为ES的数据源。

  三、使用Java:springData-ES的java方式。第三种方式使用线程池+缓存队列+springData对Es的封装逻辑,晚点另更


一、使用bulk(win7+es6.6.1+json文本)

1.准备正确的json数据格式

es对于json文本的格式要求是很严格的,合理的json数据格式如下:

"index":"demo","id":0
"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包过滤日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=网关产品线 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允许 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包过滤日志\"","stauts":"success","failMsg":null
"index":"demo","id":1
"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包过滤日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=网关产品线 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允许 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包过滤日志\"","stauts":"success","failMsg":null
"index":"demo","id":2
"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包过滤日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=网关产品线 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允许 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包过滤日志\"","stauts":"success","failMsg":null

官方所要求标准的json格式就是如上

2.cmd运行(如果使用curl异常可百度下载curl插件)

curl -H "Content-Type:appliaction/json"  -XPOST localhost:9200/index/mapping/_bulk --data-binary @xxx.json

需注意:cmd突突突的滚动起来就是成功了!


 二、使用logstash 

1.安装logstash(官网下载即可)

2.进入logstash中bin目录下,创建logstash_def.conf文件(提供启动logstash启动时加载的配置文件)

3.文件如下:

input
	file
		path => "D:/log/packet.json" 
		type => "log"
		
		start_position => "beginning"
		codec => json  
		charset => "UTF-8"     
		
	


output
	elasticsearch
		hosts => "http://127.0.0.1:9200"    
		index => "venus"				
		document_type => "log_packet"		
	

4.cmd进入logstash下bin目录(ES已经启动的前提)

命令:logstash -f logstash_def.conf

需注意:不成功会抛错,不然会一直在加载,查看状态可以使用head插件查看数据增加情况

 

以上是关于ElasticSearch导入txt文本或者json文本的主要内容,如果未能解决你的问题,请参考以下文章

导入 txt 文件数据到 MySQL 表

修改IE主页

正则法导入txt文本

(47)ElasticSearch之bulk语法格式解析

word中的内容如何导入Excel指定表格里?

mysql导入txt文本数据