如何使用Flume准实时建立Solr的全文索引

Posted Hadoop实操

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Flume准实时建立Solr的全文索引相关的知识,希望对你有一定的参考价值。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


Fayson的github:https://github.com/fayson/cdhproject


提示:代码块部分可以左右滑动查看噢


1.文档编写目的



Fayson在上篇文章《》简单介绍了Solr,然后利用Cloudera提供的Morphline工具通过创建MapReduce可以实现对HDFS中的半/非结构化数据的批量建立全文索引。本文主要介绍如何使用Morphline工具通过Flume实时的对数据建立全文索引。


  • 内容概述

1.索引建立流程

2.准备数据

3.在Solr中建立collection

4.编辑Morphline配置文件

5.启动Flume监听并实时建立索引

6.查询验证


  • 测试环境

1.RedHat7.4

2.CM5.14.3

3.CDH5.14.2

4.Solr4.10.3

5.集群未启用Kerberos


  • 前置条件

1.Solr服务已经安装并运行正常

2.Hue中已经配置集成Solr服务


2.索引建立流程



见下图为本文档将要讲述的使用Solr建立全文索引的过程:


1.先将准备好的半/非结构化数据放置在本地。


2.在Solr中建立collection,这里需要定义一个schema文件对应到本文要使用的json数据,需要注意格式对应。


3.修改Morphline的配置文件,使用Morphline解析json的功能。


4.配置flume的conf文件,使用MorphlineSolrSink,并配置指向到Morphline配置文件


5.启动flume agent的监听任务


6.启动flume的avro-client开始发送之前准备好的数据文件



Cloudera Search提供了一个比较方便的工具可以基于HDFS中的数据批量建立索引。见上图称作MapReduce Indexing Job,是属于Morphlines的一部分。


Morphline Commands是Cloudera Search项目的一部分,实现了Flume、MapReduce、HBase、Spark到Apache Solr的数据ETL。Morphline可以让你很方便的只通过使用配置文件,较为方便的解析如csv,json,avro等数据文件,并进行ETL入库到HDFS,并同时建立Solr的全文索引。从而避免了需要编写一些复杂的代码。


3.准备数据



1.准备生成一些json格式的样例数据,生成数据的Java代码如下。


package com.cloudera;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
public class GenerateSolrTestData {
   public static long getId() {
       return (long) (Math.random() * 1000000000000l);
   }
   public static String getRadomCOLLECTIONDATE() {
       String year[] = { "2018" };
       String month[] = { "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12" };
       String day[] = { "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16",
               "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28" };
       String hour[] = { "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14",
               "15", "16", "17", "18", "19", "20", "21", "22", "23" };
       String minute[] = { "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14",
               "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31",
               "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "43", "44", "45", "46", "47", "48",
               "49", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59" };
       String second[] = { "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14",
               "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31",
               "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "43", "44", "45", "46", "47", "48",
               "49", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59" };
       int index1 = (int) (Math.random() * year.length);
       int index2 = (int) (Math.random() * month.length);
       int index3 = (int) (Math.random() * day.length);
       int index4 = (int) (Math.random() * hour.length);
       int index5 = (int) (Math.random() * minute.length);
       int index6 = (int) (Math.random() * second.length);
       String coliectiondate = year[index1] + "-" + month[index2] + "-" + day[index3] + "T" + hour[index4] + ":"
               + minute[index5] + ":" + second[index6] + "Z";
       return coliectiondate;
   }
   public static String getRandomText() {
       String test[] = { "accumulo-core-1.6.0.jar", "accumulo-fate-1.6.0.jar", "accumulo-start-1.6.0.jar",
               "accumulo-trace-1.6.0.jar", "activation-1.1.jar", "activemq-client-5.10.2.jar",
               "akka-actor_2.10-2.2.3-shaded-protobuf.jar", "akka-remote_2.10-2.2.3-shaded-protobuf.jar",
               "akka-slf4j_2.10-2.2.3-shaded-protobuf.jar", "akuma-1.9.jar", "algebird-core_2.10-0.6.0.jar"};
       int index1 = (int) (Math.random() * test.length);
       return test[index1];
   }
   public static String getRandomTextCh() {
       String test[] = {
               "贾玲,原名贾裕玲。1982年4月29日出生于湖北襄阳,毕业于中央戏剧学院。喜剧女演员,师从冯巩,发起并创立酷口相声。2003年获《全国相声小品邀请赛》相声一等奖。2006年《中央电视台》第三届相声大赛专业组二等奖。2009年7月,由贾玲、邹僧等人创办的新笑声客栈开张,成为酷口相声的大本营。2010年2月14日,贾玲首次登上央视春晚的舞台表演相声《大话捧逗》,并获“我最喜爱的春晚节目”曲艺组三等奖。2011年2月2日,再次登上央视春晚舞台,表演相声《芝麻开门》。",
               "要实现近实时搜索,就必须有一种机制来实时的处理数据然后生成到solr的索引中去,flume-ng刚好提供了这样一种机>制,它可以实时收集数据,然后通过MorphlineSolrSink对数据进行ETL,最后写入到solr的索引中,这样就能在solr搜索引擎中近实时的查询到新进来的数据了由贾玲人。",
               "如上图,每个缓冲区以四个字节开头,中间是多个字节的缓冲数据,最后以一个空缓冲区结尾。",
               "实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明",
               "下面写一个最简单的Hello World例子,以便对RESTful WebService有个感性认识。因为非常专业理论化的描述RESTful WebService是一件理解起来很痛苦的事情。看看例子就知道个大概了,再看理论就容易理解多了。",
               "据香港经济日报报道,传小米可能在下周向港交所提交上市申请。经济日报此前还报道,小米最近数月不乏上市前股东售股活动,售股价格显示公司估值介乎650亿至700亿美元。此前,曾有多个小米估值的版本出现,比如1000亿美元,甚至2000亿美元,小米方面都未进行置评",
               "最近,中超新晋土豪苏宁可谓是频出大手笔。夏窗尚未开启,苏宁就早早开始谋划了。", "尽管距离泰达与恒大的比赛还有2天的时间,但比赛的硝烟已经开始弥漫。",
               "据美国媒体报道,美国当地时间21日上午,流行音乐传奇人物王子(Prince)被发现死于位于明尼苏达的住所内,医务人员进行了紧急抢救,但最终回天无力,享年57岁。",
               "016年4月19日,周杰伦召开记者会,正式宣布:与杰藝文創合作,收购S2冠军战队台北暗杀星TPA,并正式更名为",
               "上周五,麦格希金融在一项声明中说,这笔交易预计在今年第三季度完成,目前正在等待监管部门的审批" };
       int index1 = (int) (Math.random() * test.length);
       return test[index1];
   }
   public static String getData() {
       StringBuffer sbf = new StringBuffer();
       sbf.append("{\"id\": \"" + getId() + "\",\"created_at\": \"" + getRadomCOLLECTIONDATE() + "\", \"text\": \""
               + getRandomText() + "\",\"text_cn\":\"" + getRandomTextCh() + "\"}");
       return sbf.toString();
   }
   public static void write(int n, String file) {
       BufferedWriter bw = null;
       try {
           bw = new BufferedWriter(new FileWriter(file, true), 4194304);
           for (int i = 0; i < n; i++) {
               bw.write(getData() + "\r\n");
           }
           System.out.println("数据生成完毕!" + file);
       } catch (IOException e) {
           e.printStackTrace();
           System.out.println("数据生成异常!");
       } finally {
           try {
               bw.close();
           } catch (IOException e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
           }
       }
   }
   public static void main(String[] args) {
       write(3000000, "/root/data1.txt");
   }
}

(可左右滑动)


因为Fayson的AWS环境配置较低,这里只作为实验生成300W行数据,大约100MB。


Fayson为了方便后面观察Flume实时入库并建立索引的效果,这里比上一篇文章的数据放大了10倍,否则数据太少,一下入库完毕,看不出实时的效果。


2.将Java代码打包成jar并上传到服务器执行,生成数据。


[root@ip-172-31-8-230 solr-hdfs]# java -cp GenerateSolrTestData.jar com.cloudera.GenerateSolrTestData
数据生成完毕!/root/data1.txt

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


查看该数据,为300W行,1GB。


[root@ip-172-31-8-230 ~]# head data1.txt 
[root@ip-172-31-8-230 ~]# cat data1.txt |wc -l
[root@ip-172-31-8-230 ~]# du -sh data1.txt

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


4.在Solr中建立collection



1.根据json文件准备schema文件。根据第三章的json格式数据内容可以看到一共有id,username,created_at,text,text_cn几个属性项。在Solr的collection的schema文件中都要有相应的对应,如下所示:


<?xml version="1.0" encoding="UTF-8" ?>
<schema name="example" version="1.5">
<fields>
 <field name="uuuid" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="id" type="string" indexed="true" stored="true" />
 <field name="created_at" type="string" indexed="true" stored="true" />
 <field name="text" type="text_en" indexed="true" stored="true" />
 <field name="text_cn" type="text_ch" indexed="true" stored="true" />
 <field name="_version_" type="long" indexed="true" stored="true"/>
</fields>
<uniqueKey>uuuid</uniqueKey>
<types>
<!-- The StrField type is not analyzed, but indexed/stored verbatim.
         It supports doc values but in that case the field needs to be
    single-valued and either required or have a default value.
    -->

<fieldType name="string" class="solr.StrField" sortMissingLast="true"/>
<!--
         Default numeric field types. For faster range queries, consider the tint/tfloat/tlong/tdouble types.
These fields support doc values, but they require the field to be
single-valued and either be required or have a default value.
-->

<fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/>
<fieldType name="text_en" class="solr.TextField" positionIncrementGap="100" />
<fieldType name="text_ch" class="solr.TextField" positionIncrementGap="100">
       <analyzer type="index">
        <tokenizer class="solr.SmartChineseSentenceTokenizerFactory"/>
        <filter class="solr.SmartChineseWordTokenFilterFactory"/>
       </analyzer>  
</fieldType>
</types>
</schema>

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


注意Solr在建立全文索引的过程中,必须指定唯一键(uniqueKey),类似主键,唯一确定一行数据,跟上篇文章不一样,我们没有使用json中的id属性项,而选择了让solr自动生成,因为Fayson的造数代码是使用的Java,id那一列是随机数函数Math,数据量大了,没办法保持id的唯一性,所以采用了Solr来自动生成,更好的保持唯一性,该uuid会在Morphline配置文件中配置,大家可以继续往后查看Morphline配置文件章节进行对比。


https://repository.cloudera.com/artifactory/cdh-releases-rcs/org/apache/lucene/lucene-analyzers-smartcn/4.10.3-cdh5.14.2/


2.准备建立collection的脚本


ZK="ip-172-31-5-171.ap-southeast-1.compute.internal"
COLLECTION="collection1"
BASE=`pwd`
SHARD=3
REPLICA=1
echo "create solr collection"
rm -rf tmp/*
solrctl --zk $ZK:2181/solr instancedir --generate tmp/${COLLECTION}_configs
cp conf/schema.xml tmp/${COLLECTION}_configs/conf/
solrctl --zk $ZK:2181/solr instancedir --create $COLLECTION tmp/${COLLECTION}_configs
solrctl --zk $ZK:2181/solr collection --create $COLLECTION -s $SHARD -r $REPLICA
solrctl --zk $ZK:2181/solr collection --list

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


ZK:Zookeeper的某台机器的hostname

COLLECTION:需要建立的collection名字

SHARD:需要建立的shard的数量

REPLICA:副本数


3.执行create.sh脚本建立collection


[root@ip-172-31-8-230 solr-hdfs]# sh create.sh 
create solr collection
Uploading configs from tmp/collection1_configs/conf to ip-172-31-5-171.ap-southeast-1.compute.internal:2181/solr. This may take up to a minute.
collection1 (2)

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


5.编辑Morphline配置文件



1.准备Morphline的配置文件


# Specify server locations in a SOLR_LOCATOR variable; used later in   
# variable substitutions:  
SOLR_LOCATOR : {  
 # Name of solr collection  
 collection : collection1  
 # ZooKeeper ensemble  
 zkHost : "ip-172-31-5-171.ap-southeast-1.compute.internal:2181/solr"    
}  
# Specify an array of one or more morphlines, each of which defines an ETL  
# transformation chain. A morphline consists of one or more potentially  
# nested commands. A morphline is a way to consume records such as Flume events,  
# HDFS files or blocks, turn them into a stream of records, and pipe the stream  
# of records through a set of easily configurable transformations on its way to  
# Solr.  
morphlines : [  
 {  
   # Name used to identify a morphline. For example, used if there are multiple  
   # morphlines in a morphline config file.  
   id : morphline1  
   # Import all morphline commands in these java packages and their subpackages.  
   # Other commands that may be present on the classpath are not visible to this  
   # morphline.  
   importCommands : ["org.kitesdk.**", "org.apache.solr.**","com.cloudera.example.**"]  
   commands : [                      
     {  
       readJson {}  
     }  
     {  
       extractJsonPaths {  
         flatten : false  
         paths : {  
           id : /id              
           created_at : /created
_at  
           text : /text        
           text_cn:/text
_cn
         }  
       }  
     }  
{
       generateUUID {
           field:uuuid
       }
   }      
     # Consume the output record of the previous command and pipe another  
     # record downstream.  
     #  
     # convert timestamp field to native Solr timestamp format  
     # such as 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z  
     {  
       convertTimestamp {  
         field : created_at  
         inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]  
         inputTimezone : America/Los_Angeles  
         outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"                                  
         outputTimezone : UTC  
       }  
     }  
     # Consume the output record of the previous command and pipe another  
     # record downstream.  
     #  
     # This command deletes record fields that are unknown to Solr  
     # schema.xml.  
     #  
     # Recall that Solr throws an exception on any attempt to load a document  
     # that contains a field that is not specified in schema.xml.  
     {  
       sanitizeUnknownSolrFields {  
         # Location from which to fetch Solr schema  
         solrLocator : ${SOLR_LOCATOR}  
       }  
     }    
     # log the record at DEBUG level to SLF4J  
     { logDebug { format : "output record: {}", args : ["@{}"] } }      
     # load the record into a Solr server or MapReduce Reducer  
     {  
       loadSolr {  
         solrLocator : ${SOLR_LOCATOR}  
       }  
     }  
   ]  
 }  
]  

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


根据上图可以看到配置项里包括:

注意我们使用了readjson方法,然后对应到我们之前定义的schema文件里的json属性项

比上篇文章不一样的地方,我们引入了uuid,对应到第四章schema文件中的uuuid


6.下载分发中文分词jar包



1.将中文分词包拷贝到指定的目录,首先到以下网址下载中文分词的jar包


https://repository.cloudera.com/artifactory/cdh-releases-rcs/org/apache/lucene/lucene-analyzers-smartcn/4.10.3-cdh5.14.2/

(可左右滑动)


将中文分词jar包分发到所有机器的Solr和YARN服务相关的目录


[root@ip-172-31-8-230 solr-hdfs]# cp lucene-analyzers-smartcn-4.10.3-cdh5.14.2.jar /opt/cloudera/parcels/CDH/lib/hadoop-yarn
[root@ip-172-31-8-230 solr-hdfs]# cp lucene-analyzers-smartcn-4.10.3-cdh5.14.2.jar /opt/cloudera/parcels/CDH/lib/solr/webapps/solr/WEB-INF/lib

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


2.分发到集群所有机器


[root@ip-172-31-8-230 shell]# sh bk_cp.sh node.list  /opt/cloudera/parcels/CDH/lib/hadoop-yarn/lucene-analyzers-smartcn-4.10.3-cdh5.14.2.jar  /opt/cloudera/parcels/CDH/lib/hadoop-yarn
lucene-analyzers-smartcn-4.10.3-cdh5.14.2.jar          
[root@ip-172-31-8-230 shell]# sh bk_cp.sh node.list /opt/cloudera/parcels/CDH/lib/solr/webapps/solr/WEB-INF/lib/lucene-analyzers-smartcn-4.10.3-cdh5.14.2.jar /opt/cloudera/parcels/CDH/lib/solr/webapps/solr/WEB-INF/lib

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引


7.启动Flume监听并实时建立索引



1.配置Flume监听启动时需要使用的配置文件


[root@ip-172-31-8-230 conf]# cat flume-solr.conf 
tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 44444
tier1.sources.source1.channels=channel1
tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000000
tier1.channels.channel1.transactionCapacity=10000
tier1.channels.channel1.keep-alive=60
tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.morphlineFile =/root/solr-hdfs-flume/conf/morphlines.conf
tier1.sinks.sink1.morphlineId = morphline1

(可左右滑动)    


如何使用Flume准实时建立Solr的全文索引


2.启动Flume监听


[root@ip-172-31-8-230 conf]# flume-ng agent --conf conf --conf-file flume-solr.conf --name tier1 -Xms1024m -Xmx2048m -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/bin/hbase) for HBASE access
Info: Including Hive libraries found via () for Hive access

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引


3.启动flume的avro client开始发送数据


[root@ip-172-31-8-230 ~]# flume-ng avro-client -H localhost -p 44444 -F data1.txt

(可左右滑动)


如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引


4.使用Solr的查询界面可以实时的看到数据在慢慢进入Solr,并建立索引。


如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引


Flume的avro client发送数据完毕,已经关闭客户端。


如何使用Flume准实时建立Solr的全文索引


Flume监听这边显示连接断开


如何使用Flume准实时建立Solr的全文索引


入库数据约为176W,说明还在继续入库


如何使用Flume准实时建立Solr的全文索引


再次等待一会,数据最终入库完毕,300W条。


如何使用Flume准实时建立Solr的全文索引


8.Solr自带界面全文索引查询验证



1.从Cloudera Manger中选择Solr并进入其中一台Solr Server的界面


如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引


2.选择一个collection的shard并进入collection的查询界面


如何使用Flume准实时建立Solr的全文索引


3.点击query按钮,准备开始查询数据


如何使用Flume准实时建立Solr的全文索引


4.查询全部数据


如何使用Flume准实时建立Solr的全文索引


5.得到查看结果是300W条,符合预期,表明所有数据都已经入库成功


如何使用Flume准实时建立Solr的全文索引


6.查询关键字“实际”,发现查询结果会显示所有带有“实际”词语的条目,一共273250条。


如何使用Flume准实时建立Solr的全文索引


9.使用Hue进行全文索引查询验证



1.进入Hue并选择“Indexes”页面


如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引


2.选择collection1


如何使用Flume准实时建立Solr的全文索引

如何使用Flume准实时建立Solr的全文索引


3.点击右上角放大镜查询图标


如何使用Flume准实时建立Solr的全文索引


4.总数为300W条,符合预期


如何使用Flume准实时建立Solr的全文索引


5.同样查询“实际”关键字,发现“实际”会被高亮,并且27453条符合预期,与第8节使用Solr自带界面查询的结果也是相符合的。



10.总结



1.使用Cloudera提供的Morphline工具,可以让你不需要编写一行代码,只需要通过使用一些配置文件就可以快速的对半/非机构化数据进行全文索引。而且还可以实现Flume的准实时建立索引


2.本文demo提供的中文分词是比较弱的,要想真正上生产使用,可以考虑使用更好的开源中文分词包或者其他第三方的。


3.注意如果全文索引的字段有需要做中文分词的,需要将中文分词的jar包上传到所有机器的Solr和YARN服务相关的目录。否则Solr会无法创建collection,YARN也无法启动创建索引的MapReduce任务。


4.本文只是以json格式的数据进行举例验证,实际Morphline还支持很多其他的格式,包括结构化数据csv,HBase中的数据等等。具体请参考:


5.如果数据文件没有唯一确定的id字段,类似主键,可以使用morphline的uuid功能,保证所有数据都能入库成功,否则可能导致数据丢失。

http://kitesdk.org/docs/1.1.0/morphlines/

https://www.cloudera.com/documentation/enterprise/latest/topics/search.html

 

本文所有代码或脚本源码已上传到github,参考:

https://github.com/fayson/cdhproject/tree/master/generatedata/solr-hdfs-flume

https://github.com/fayson/cdhproject/blob/master/generatedata/src/main/java/com/cloudera/solr/GenerateSolrTestData.java

 


提示:代码块部分可以左右滑动查看噢


为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。



推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。


以上是关于如何使用Flume准实时建立Solr的全文索引的主要内容,如果未能解决你的问题,请参考以下文章

solr全文检索,建立的文件索引,检索不出正确结果

Kafka+Flume+Morphline+Solr+Hue数据组合索引

全文搜索服务器solr

在HBase之上的solr中创建索引

和我一起打造个简单搜索之Logstash实时同步建立索引

Solr---有趣的全文检索(原理篇)