spark将数据加载到hbase--bulkload方式

Posted 一加六

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark将数据加载到hbase--bulkload方式相关的知识,希望对你有一定的参考价值。

通过bulkload方式加载数据优点:与put方式相比
1.导入过程不占用Region资源
2.能快速导入海量的数据
3.节省内存
应该是业界将数据载入hbase常用方式之一,因此有必要学习掌握

实现步骤

步骤一 读取数据生成rdd

读入数据是面向行的表,一行有多个字段,需要转换成面向列的数据,构造keyValue对象,一定要注意key们要排序,比如user:age列要在user:gender列之前
需要设计行键保证行键唯一和避免数据都涌入一个region,如我的是按时间设计的,好几个月的数据,因此将数据按月预分区。

    val rdd = sc.textFile("file:///"+filePath)
      .flatMap(x=>getLineData(x,rowKeyBase,HBaseUtils.LOG_FIELD_NAMES))
      .sortByKey()
  //处理每一条记录生成keyvalue对象
  def getLineData(line:String,rowkey:String,fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] =
    val length = fieldNames.size
    val values:Array[String] = line.split("\\\\\\t")
    if (null == values || values.length!=length) return Nil
    //println(rowkey+values(1)+Random.nextInt(100000).toString)
    val rowKey = Bytes.toBytes(rowkey+values(1)+Random.nextInt(1000).toString)
    val writable = new ImmutableBytesWritable(rowKey)
    val columnFamily = Bytes.toBytes("detail")
    fieldNames.toList.map
      case (fieldName, fieldIndex) =>
        // KeyValue实例对象
        val keyValue = new KeyValue(
          rowKey, //
          columnFamily, //
          Bytes.toBytes(fieldName), //
          Bytes.toBytes(values(fieldIndex)) //
        )
        // 返回
        (writable, keyValue)
    
  

步骤二 配置输出HFile文件

输出前检查

检查HFile输出目录是否存在

    // TODO:构建Job,设置相关配置信息,主要为输出格式
    // a. 读取配置信息
    val hbaseConfig: Configuration = HBaseUtils.getHBaseConfiguration("hbase","2181")
    //  Configuration parameter hbase.mapreduce.hfileoutputformat.table.name cannot be empty
    hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", "log")
    // b. 如果输出目录存在,删除
    val dfs = FileSystem.get(hbaseConfig)
    val outputPath: Path = new Path("hdfs://hbase:9000/hbase/log/"+rowKeyBase)
    if (dfs.exists(outputPath)) 
      dfs.delete(outputPath, true)
    
    dfs.close()

配置HFileOutputFormat2

    // TODO: 配置HFileOutputFormat2输出
    val conn = ConnectionFactory.createConnection(hbaseConfig)
    val htableName = TableName.valueOf("log")
    val table: Table = conn.getTable(htableName)
    HFileOutputFormat2.configureIncrementalLoad(
      Job.getInstance(hbaseConfig), //
      table, //
      conn.getRegionLocator(htableName) //
    )

输出HFile文件

    // TODO: 3. 保存数据为HFile文件//先排序
    rdd.sortBy(x=>(x._1, x._2.getKeyString), ascending = true)
      .saveAsNewAPIHadoopFile(
        "hdfs://hbase:9000/hbase/log/"+rowKeyBase,
        classOf[ImmutableBytesWritable], //
        classOf[KeyValue], //
        classOf[HFileOutputFormat2], //
        hbaseConfig)

将HFile文件bulkload到hbase表分区当中

    // TODO:4. 将输出HFile加载到HBase表中
    val load = new LoadIncrementalHFiles(hbaseConfig)
    load.doBulkLoad(outputPath, conn.getAdmin, table,
      conn.getRegionLocator(htableName))

出现的问题

写入权限
可以将HFile要输出的文件位置chmod 777 /outputDir

以上是关于spark将数据加载到hbase--bulkload方式的主要内容,如果未能解决你的问题,请参考以下文章

Spark 广播连接将数据加载到驱动程序

Spark在处理数据的时候,会将数据都加载到内存再做处理吗?

使用 spark 将 parquet 文件加载到 vertica 数据库中

使用 spark-sql cli 将 csv 数据直接加载到 parquet 表中

如何从驱动程序将不适合驱动程序内存的数据加载到 Spark 独立集群中?

如何将最新的 100 行从 Hbase 加载到 Spark