消息消费轨迹存储效率优化

Posted 五道口纳什

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息消费轨迹存储效率优化相关的知识,希望对你有一定的参考价值。

此文原作者为便利蜂基础架构组的一名大三实习生。在短短的几周实习时间里,作者拿到问题后调研并尝试不同的方案,最后取得了不错的改进效果。
如果你对相关的技术感兴趣,致力于研发效率提升,欢迎加入我们。
可投递简历至:tech-hiring@bianlifeng.com 邮件标题:产研平台基础组件部

1. 背景

消息队列是业务中常用的中间件,除了消息收发等核心流程以外,对历史消息轨迹的跟踪查询也非常重要。如果没有历史消息查询,那么一旦出了问题将很难进行定位。

公司使用的消息队列中间件为qmq,和其它消息队列中间件类似,qmq的主要组件包括meta server(提供集群管理和集群发现的作用),server(提供实时消息服务),producer(消息生产者),consumer(消息消费者)。在消息的收发过程中,消息都存储在Server端,为了提供高吞吐的消息收发服务,server是以顺序日志的形式存储消息的,这种格式并不利于消息的检索。为了提供历史消息的查询服务,qmq专门有一个backup模块进行历史消息的备份和查询,为了减少历史消息备份的数据大小,backup仅仅从server slave中同步消息的索引而非完整的消息内容;然后backup将索引保存到hbase中。这样,backup便可从HBase中查询出对应的消息索引并据此读到具体的消息内容。具体如下图:

2. 现状

现在公司的backup将消息索引写入HBase采用的是调用HBase Client的PUT API的方式,每次批量写入若干条消息(默认是1000条消息索引),然而这种方式写入效率比较低下。当需要写入HBase的消息索引数据量特别大时,写入一次批量的消息索引需要2-3秒(公司HBase的服务器配置较低),消息备份的延迟可能有几个小时甚至是几天。导致无法及时地从HBase中查询到对应的历史消息,无法满足现有业务的需求。有鉴于此,我们需要尝试提升backup将消息索引写入HBase的速度。

3. 寻找数据写入HBase效率低下的原因

为了了解数据是如何写入HBase的,我们首先大致了解一下HBase的架构:

在HBase集群中,真正负责读写数据的是一个个的Region Server,而每个Region Server又管理着多个region。

一个region中存储的是同一张HBase表的数据,每个region包含一个或多个Store,每个Store对应HBase表的一个列簇(Column Family)。

Store由MemStore 和 StoreFile 组成,MemStore是写缓存,在内存中存储着还未被持久化到硬盘的数据,当MemStore满之后,会被flush到StoreFile中,StoreFile对应一个实际的HFile 格式的文件。

HLog即Write Ahead Log(WAL),记录着每个Region Server的数据操作日志,用来做故障恢复。

在了解了HBase的基本架构之后,我们可以继续了解一下向HBase中写入数据的普通方式,即我们目前使用的方式。一般向HBase中写入数据最直接的方式是调用HBase的API用put方法插入数据,其流程大致如下:
Client调用API写入数据到HBase实际上都是RPC请求,HMaster会将Client写入数据的请求分发到对应的Region Server。

写入的数据传输到Region Server后,会先写入到HLog即Write Ahead Log(WAL)中,然后再将数据写入到对应region的MemStore中,当MemStore满之后,才会被flush到StoreFile中,flush会耗费较多的I/O资源。

HBase还可能会触发split和compaction操作。当存在较多的小HFile文件时,会触发compaction操作,将多个小文件合并成大文件,以减少HFile的文件数量。当region过大时会触发split操作,将其分裂成两个子region。

在有大数据量写入时这种写入数据的方式效率会比较低下,因为会频繁的进行写WAL、flush操作,耗费较多的磁盘I/O。

4. 如何提升HBase写入效率

调用HBase Client的PUT API的方式写数据效率低下,那我们能不能找到一种更加高效的写数据的方式呢?正如上面所介绍的,HBase的底层存储是使用的HFile文件格式。当有大量数据需要写入HBase时,如果我们能够批量将数据直接写成HFile文件,然后直接导入到HBase是不是就可以提高写入速度呢?经过学习,我们发现HBase提供了一种Bulk Load的API。

Bulk Load 直接将数据输出成 HBase table 内部的存储格式,即HFile文件,然后将生成的 HFile 加载到集群的相应节点。这种方式无需进行写WAL、flush等过程,不会产生大量的写入 I/O,所以需要较少的 CPU 和网络资源。使用Bulk Load批量加载数据,能极大的提高写入效率,并降低对 Region Server 节点的写入压力。

5. Bulk Load批量导入数据的具体实现

下面我们便通过简单示例介绍一下如何通过Bulk Load 将数据导入HBase中。

5.1 准备

首先需要提前部署好HBase以及HDFS服务,在HBase中建好相应的表,例如:

create 'bltable','cf'

5.2 实现

在HBase中,Bulk Load主要的过程包括:从其它数据源导出数据,比如简单的文本文件或者是其它数据库;将数据转换成HFile形式;将生成的HFile导入到HBase特定的region中。实现Bulk Load可以借助ImportTsv和CompleteBulkLoad工具或者是通过编程的方式,这里我们主要讲述通过编程实现的方式。通过编程实现Bulk Load将数据导入HBase具体也有两种方式,分别为MapReduce方式和非MapReduce方式。

5.3 MapReduce方式实现BulkLoad

这种方式具体来说包括3个步骤:

  1. 准备好数据源文件(比如文本文件)并上传至HDFS中,可通过Hadoop的fs命令将数据源文件从本地文件系统中上传至HDFS中,或者也可以借助其他的工具。
  2. 利用一个MapReduce的job将HDFS中的数据转换成HFile。这一步需要一个MapReduce作业,大多数情况下需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。该作业需要使用rowkey(行键)作为输出Key;KeyValue、Put或者Delete作为输出Value。MapReduce作业需要使用HFileOutputFormat2来生成HBase的数据文件,即HFile。
  3. 将生成的HFile导入到HBase中,这步比较简单,只需要调用LoadIncrementalHFiles类的doBulkLoad()方法(旧版本,1.x.y版本的hbase-server依赖)或者用BulkLoadHFilesTool类的doBulkLoad()方法(新版本,2.x.y版本的hbase-server依赖)。

下面是一个简单的示例,使用的数据源文件是一个txt文本文件,里面的内容类似以下,每行由一个MessageID与Offset组成:

messageid00001498 1498 
messageid00001499 1499 
messageid00001500 1500 
messageid00001501 1501 
messageid00001502 1502

首先,可通过以下命令或其它工具将该数据源文件上传至HDFS中:

hadoop fs -put /path/on/localdisk /path/on/hdfs
#例如
hadoop fs -put /Users/bianlifeng/Documents/bulkloadtest/sourcedata.txt hdfs://127.0.0.1/tmp/sourcedata.txt

然后编写程序将已上传至HDFS中的数据源文件转换成HFile文件并导入到HBase中。

正如上面所说的,我们需要自己定义一个Mapper类,重写其中的map()方法,如下:

static class HFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] datas = line.split(" ");
            //datas的形式:[messageid offset]
	    //datas[0]即messageid是每条记录的rowkey
            ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(datas[0]));
	    //创建一条记录对应的KeyValue,其中"cf"为列簇名,"offset"为列名,datas[1]为该列的值
            KeyValue kv = new KeyValue(Bytes.toBytes(datas[0]), "cf".getBytes(), "offset".getBytes(),datas[1].getBytes());
            context.write(rowKey, kv);
        }

接着我们需要定义一个MapReduce作业并执行,最后调用LoadIncrementalHFiles类的doBulkLoad()方法将生成的HFile导入到指定的HBase表中,如下:

final String INPUT_PATH="hdfs://127.0.0.1/tmp/sourcedata.txt";
final String OUTPUT_PATH="hdfs://127.0.0.1/tmp/outputhfile";
final String TABLE_NAME="bltable";
Configuration conf = HBaseConfiguration.create();
Connection conn= ConnectionFactory.createConnection(conf);
Table htable= conn.getTable(TableName.valueOf(TABLE_NAME));
Admin admin= conn.getAdmin();
 
// 设置各个类名
Job job = Job.getInstance(conf, "BulkLoad");//job的名字
job.setJarByClass(HFileGenerator.class);//执行job的class
job.setMapperClass(HFileGenerator.HFileMapper.class);//map类,即上面的HFileMapper
job.setMapOutputKeyClass(ImmutableBytesWritable.class);//key类型
job.setMapOutputValueClass(KeyValue.class);//value类型
// 设置输入输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
HFileOutputFormat2.configureIncrementalLoad(job, htable, conn.getRegionLocator(TableName.valueOf(TABLE_NAME)));
if (job.waitForCompletion(true)) {
   //使用bulkload将hfile载入到hbase表
   LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
   //新版本(2.x.y)里该用这个了
   //BulkLoadHFilesTool loader=new BulkLoadHFilesTool(conf);  
   loader.doBulkLoad(new Path(OUTPUT_PATH),admin,htable,conn.getRegionLocator(TableName.valueOf(TABLE_NAME)));
}

5.4 非MapReduce方式实现BulkLoad

由于MapReduce方式依赖较重,实现略微复杂以及执行耗费时间也较多,那是否有其它的实现方式呢?实际上我们使用的是下面要介绍的这种非MapReduce方式。

这种方式主要包括以下两个步骤:

  1. 创建一个HFile.Writer,配置好生成HFile的路径等信息,对应每条数据生成key-value对然后调用write.append()方法将数据写入HFile中,注意这种方式需要保证在写入时key是有序的,如果不是有序的需要先排序再写入,比如利用Treemap进行排序。
  2. 将生成的HFile导入到HBase中,和使用MapReduce方式一样,只需要调用LoadIncrementalHFiles类的doBulkLoad()方法即可。

创建HFile.Writer并将数据写入HFile的核心代码如下:

Configuration conf= HBaseConfiguration.create();
String TABLE_NAME="bltable";
byte[] FAMILY_NAME= Bytes.toBytes("cf");//列簇名
byte[] COLOMU_NAME=Bytes.toBytes("offset");//列名
Path HFILE_PARENT_PARENT_DIR=new Path("/tmp/test");
Path HFILE_PATH=new Path("/tmp/test/"+new String(FAMILY_NAME)+"/hfile");

Configuration tempConf=new Configuration(conf);
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 1.0f);
HFileContext fileContext = new HFileContext();
HFile.Writer writer=null;
try {
     writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf))
            .withPath(FileSystem.get(conf), HFILE_PATH)
            .withFileContext(fileContext).create();
	//对应每条数据创建KeyValue并写入HFile中
	for(int i=0;i<100;i++){
    	byte[] key=Bytes.toBytes("rowkey"+String.format("%08d",i));
    	byte[] value=Bytes.toBytes("vallue"+String.format("%08d",i));
    	long currentTime=System.currentTimeMillis();
    	KeyValue kv=new KeyValue(key,FAMILY_NAME,COLOMU_NAME,currentTime,value);
    	writer.append(kv);
	}
} catch (IOException e) {
    e.printStackTrace();
} finally{
	writer.close();
}

然后我们跟上面使用MapReduce方式一样,只需调用LoadIncrementalHFiles类的doBulkLoad()方法即可将生成的HFile导入到指定的HBase表中。

5.5 注意事项

这种方式还有一个需要注意的地方是HFile的路径问题。

在整个过程中需要用到两个路径,一个是使用HFile.Writer将数据写入HFile时,需要指定具体的HFile文件的路径,在这里是HFILE_PATH,如下:

writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf))
                    .withPath(FileSystem.get(conf), HFILE_PATH)
                    .withFileContext(fileContext).create();

另一个是使用LoadIncrementalHFiles的doBulkLoad()方法时,需要指定HFile文件的上上层文件夹的路径,在这里是HFILE_PARENT_PARENT_DIR,如下:

loader.doBulkLoad(HFILE_PARENT_PARENT_DIR,admin,htable,conn.getRegionLocator(TableName.valueOf(TABLE_NAME)));

这两个路径具体的值如下:

Path HFILE_PARENT_PARENT_DIR=new Path("/tmp/test");
Path HFILE_PATH=new Path("/tmp/test/"+new String(FAMILY_NAME)+"/hfile");

这里FAMILY_NAME是HBase对应表的列簇的名字,因为调用doBulkLoad()时指定的是HFile文件的上上层文件夹的路径,然后它会去寻找是否存在对应列簇的子文件夹,然后到该子文件夹里面去读取HFile文件,所以HFile文件的路径中直接上层文件夹的名字必须为对应的列簇名。如果不是这样的话,在执行doBulkLoad()时无法找到需要上传的文件。

5.6 验证

运行程序之后,我们可以通过类似以下命令查看中间生成的HFile文件:

hadoop fs -ls 存放HFile文件的文件夹的路径
#例如
hadoop fs -ls hdfs://127.0.0.1/tmp

也可连接HBase并查看对应的表中是否成功导入了数据。

6. Bulk Load源码分析

从上面使用Bulk Load实现快速写入数据至HBase的介绍中,我们可以看到中间生成的HFile文件需要暂时存放在HDFS中。可能有同学会问,为什么不直接将HFile文件暂存在本地呢?其实一开始我们的确是打算将中间的HFile文件暂存在backup server的本地磁盘上,即生成HFile的文件路径写成backup server的本地路径。然而比较遗憾的是,这样使用本地路径的话无法实现完整的Bulk Load过程。因为在生成HFile文件之后,在将HFile文件导入HBase时实际上是通过RPC让HBase中对应的Region Server去执行真正的导入过程。如果我们将生成HFile的文件路径写成backup server的本地路径,将HFile文件暂存在backup server上,Region Server是无法读取到的,所以就无法执行后续的将HFile导入HBase的过程。为了较好地理解这一点,以及更为深入地了解一下Bulk Load的机制,我们可以稍微分析一下Bulk Load的源码。

使用Bulk Load将HFile导入HBase调用的方法是doBulkLoad(),为了方便介绍,删减一些非关键代码后,其核心源码如下:

public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
     RegionLocator regionLocator) throws TableNotFoundException, IOException  {
      //创建用于BulkLoad的线程池
     ExecutorService pool = createExecutorService();

      //在后面会为每个HFile生成一个LoadQueueItem对象,并添加到queue中,一般该队列称为LQI队列,里面的元素为LQI
    Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
     //核心方法1,主要是遍历,检查有效的HFile并加入到LQI队列中
     prepareHFileQueue(hfofDir, table, queue, validateHFile);

     int count = 0;
     //队列不空则一直循环
     if (queue.isEmpty()) return;
     while (!queue.isEmpty()) {
     //得到所有region的的startKey,endKey
       final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
     //最大的循环次数,默认为10,可以在配置文件中通过hbase.bulkload.retries.number设置
       int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
       maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
       if (maxRetries != 0 && count >= maxRetries) {
         throw new IOException("Retry attempted " + count + " times without completing, bailing out");
       }
       count++;

     //核心方法2,将LQI队列中的每个HFile根据HBase table的region metedata进行分组分割,将每个LQI都划分到所属的region中
       Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table, pool, queue, startEndKeys);

     //核心方法3,将划分好的LQI加载到对应的region
       bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
     }
}

6.1 核心方法1:prepareHFileQueue

public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
    boolean validateHFile) throws IOException {
  discoverLoadQueue(queue, hfilesDir, validateHFile);
  validateFamiliesInHFiles(table, queue);
}

其中discoverLoadQueue()方法中会调用visitBulkHFiles()方法,到指定路径hfilesDir下检查所有的文件是否符合格式规范,为每个符合格式的HFile创建一个LoadQueueItem对象并添加到LQI队列中。

由于需要加载的HFile在HDFS中是按照列簇(column family)放置在不同的子目录下,所以validateFamiliesInHFiles()方法会检查每个LQI的列簇是否属于对应的HBase table。

6.2 LQI队列不空循环

doBulkLoad()方法中while循环终止的条件为:1.LQI队列为空,即所有HFile已经上传完成;2.循环次数超过maxRetries,抛出IOException。

其中maxRetries默认为10,可以在配置文件中通过hbase.bulkload.retries.number设置。

在该while循环中有两个重要的方法,如下:

6.2.1 核心方法2:groupOrSplitPhase

这个方法主要是遍历上述所提到的LQI队列,对于每一个LQI,可以获取到其对应的HFile,然后根据HFile的[firstkey, lastkey]以及之前获取到的每个region的[starkey, endkey]来判断每个HFile是否需要拆分。如果一个HFile的[firstkey, lastkey]不在任何region的[starkey, endkey]范围内,则该HFile需要拆分,拆分后的文件后缀为.top和.bottom,然后新拆分出来的两个HFile可以得到对应的两个LQI,并加入到上述所提的LQI队列中。groupOrSplitPhase()方法的返回值为Multimap<ByteBuffer, LoadQueueItem> regionGroups,代表可以加载上传的LQI,这是一个Multimap,key为region的startkey,value为对应的LQI,一个region可对应多个LQI。

6.2.2 核心方法3:bulkLoadPhase

这个方法主要是将上一步中划分好的LQI加载到对应的region。在执行上述groupOrSplitPhase()方法得到一个regionGroups后,bulkLoadPhase()方法会调用tryAtomicRegionLoad()方法将每个region对应的LQI加载到目标region中。如果加载失败,则会将失败的LQI重新加入到LQI队列中,在下一次while循环中再进行划分和加载。而在tryAtomicRegionLoad()方法中,会创建一个RegionServerCallable来负责将该LQI加载到目标region中。

RegionServerCallable的官方注释为:

Implementations call a RegionServer and implement call(int). Passed to a RpcRetryingCaller so we retry on fail.

所以,RegionServerCallable会传递给RpcRetryingCaller,而RpcRetryingCaller则是通过远程调用让Region Server去执行具体的将HFile导入对应region的过程。

可能遇到的问题与解决方法
在我们尝试Bulk Load的时候,遇到了一些问题,下面简单地说明一下。

ERROR wtracer[] [index] o.a.h.h.m.LoadIncrementalHFiles:? - Unexpected execution exception during splitting
java.util.concurrent.ExecutionException: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplitPhase(LoadIncrementalHFiles.java:649)
	......

第一个可能遇到的问题是在doBulkLoad()的时候出现以上的错误,看这个描述,说是缺少snappy的本地库。一开始觉得奇怪的是,我们在生成HFile的时候指定的是不使用压缩,为什么还会需要snappy压缩本地库呢?后来根据异常栈仔细查看代码,发现Bulk Load有自动判断是否需要拆分HFile文件的机制,如果需要拆分的话,会通过copyHFileHalf()方法来将原本的HFile文件写成两个子文件,在这方法中会去获取HBase表使用的压缩算法,然后以此作为新生成HFile文件的压缩算法。检查我们的HBase表,的确是使用的snappy压缩算法。所以最后的解决方法无非两种:添加snappy本地库,或者是将HBase表的压缩算法换成其它不需要本地库支持的压缩算法。

ERROR wtracer[] [index] o.a.h.h.m.LoadIncrementalHFiles:? - IOException during splitting
java.util.concurrent.ExecutionException: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file hdfs://hbase/tmp/trace/m/hfile
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplitPhase(LoadIncrementalHFiles.java:649)

还有可能遇到的问题是在doBulkLoad()时碰到上面这种读取HFile文件出错或者是HFile文件不存在。这个问题在我们刚开始试验Bulk Load时没有出现,是后来才出现的。通过在网上查找资料,总结出现这种问题的原因是因为有多个线程在同时读写同一个HFile文件。但是在我们程序中执行Bulk Load是单线程的,那问题在哪呢?顺着可能是多个线程在同时读写的想法,后来找到是因为我们有多台机器在执行Bulk Load,而且它们读写HFile文件的路径和文件名都是一样的!原因找到之后,我们只需要避免不同机器读写相同HFile文件即可。我们可以将不同机器生成的HFile写到不同的文件夹中,比如以主机名命名的文件夹。此外,我们也可以为每次生成的HFile文件都使用不同的文件名,比如取其中一个key命名。这样有一个好处是,如果在一次Bulk Load文件上传失败时,上传失败的文件不会被后面的文件覆盖掉,这样可以在下次执行doBulkLoad()时进行重传。

7. 总结

本文我们从qmq历史消息高效备份以及查询的生产需求,展开了如何实现高效导入大批量数据至HBase的讨

以上是关于消息消费轨迹存储效率优化的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

mq消息存储

RocketMQ集群如何打开消息轨迹的追踪?

42 线上生产环境的RocketMQ集群进行消息轨迹的追踪

RabbitMQ消费者性能优化相关配置说明

消息队列如何利用标签实现消息过滤