spark将数据加载到hbase--bulkload方式
Posted 一加六
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark将数据加载到hbase--bulkload方式相关的知识,希望对你有一定的参考价值。
通过bulkload方式加载数据优点:与put方式相比
1.导入过程不占用Region资源
2.能快速导入海量的数据
3.节省内存
应该是业界将数据载入hbase常用方式之一,因此有必要学习掌握
实现步骤
步骤一 读取数据生成rdd
读入数据是面向行的表,一行有多个字段,需要转换成面向列的数据,构造keyValue对象,一定要注意key们要排序,比如user:age列要在user:gender列之前
需要设计行键保证行键唯一和避免数据都涌入一个region,如我的是按时间设计的,好几个月的数据,因此将数据按月预分区。
val rdd = sc.textFile("file:///"+filePath)
.flatMap(x=>getLineData(x,rowKeyBase,HBaseUtils.LOG_FIELD_NAMES))
.sortByKey()
//处理每一条记录生成keyvalue对象
def getLineData(line:String,rowkey:String,fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] =
val length = fieldNames.size
val values:Array[String] = line.split("\\\\\\t")
if (null == values || values.length!=length) return Nil
//println(rowkey+values(1)+Random.nextInt(100000).toString)
val rowKey = Bytes.toBytes(rowkey+values(1)+Random.nextInt(1000).toString)
val writable = new ImmutableBytesWritable(rowKey)
val columnFamily = Bytes.toBytes("detail")
fieldNames.toList.map
case (fieldName, fieldIndex) =>
// KeyValue实例对象
val keyValue = new KeyValue(
rowKey, //
columnFamily, //
Bytes.toBytes(fieldName), //
Bytes.toBytes(values(fieldIndex)) //
)
// 返回
(writable, keyValue)
步骤二 配置输出HFile文件
输出前检查
检查HFile输出目录是否存在
// TODO:构建Job,设置相关配置信息,主要为输出格式
// a. 读取配置信息
val hbaseConfig: Configuration = HBaseUtils.getHBaseConfiguration("hbase","2181")
// Configuration parameter hbase.mapreduce.hfileoutputformat.table.name cannot be empty
hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", "log")
// b. 如果输出目录存在,删除
val dfs = FileSystem.get(hbaseConfig)
val outputPath: Path = new Path("hdfs://hbase:9000/hbase/log/"+rowKeyBase)
if (dfs.exists(outputPath))
dfs.delete(outputPath, true)
dfs.close()
配置HFileOutputFormat2
// TODO: 配置HFileOutputFormat2输出
val conn = ConnectionFactory.createConnection(hbaseConfig)
val htableName = TableName.valueOf("log")
val table: Table = conn.getTable(htableName)
HFileOutputFormat2.configureIncrementalLoad(
Job.getInstance(hbaseConfig), //
table, //
conn.getRegionLocator(htableName) //
)
输出HFile文件
// TODO: 3. 保存数据为HFile文件//先排序
rdd.sortBy(x=>(x._1, x._2.getKeyString), ascending = true)
.saveAsNewAPIHadoopFile(
"hdfs://hbase:9000/hbase/log/"+rowKeyBase,
classOf[ImmutableBytesWritable], //
classOf[KeyValue], //
classOf[HFileOutputFormat2], //
hbaseConfig)
将HFile文件bulkload到hbase表分区当中
// TODO:4. 将输出HFile加载到HBase表中
val load = new LoadIncrementalHFiles(hbaseConfig)
load.doBulkLoad(outputPath, conn.getAdmin, table,
conn.getRegionLocator(htableName))
出现的问题
写入权限
可以将HFile要输出的文件位置chmod 777 /outputDir
以上是关于spark将数据加载到hbase--bulkload方式的主要内容,如果未能解决你的问题,请参考以下文章
Spark在处理数据的时候,会将数据都加载到内存再做处理吗?
使用 spark 将 parquet 文件加载到 vertica 数据库中
使用 spark-sql cli 将 csv 数据直接加载到 parquet 表中