Java往hbase写数据

Posted Flink菜鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java往hbase写数据相关的知识,希望对你有一定的参考价值。

接上篇读HDFS

 

上面读完了HDFS,当然还有写了。

先上代码:

WriteHBase

public class WriteHBase {

    public static void writeHbase(String content){
     // HDFS 数据是一行一条记录
        String[] lines = content.split("\\n");
        int userSize = 0;
        List<Put> puts = new ArrayList<Put>();
        Put put;
        for(String line : lines){
       //只有两列,以#号分割,一列rowkey,一列value,一个value是很多列数据拼接起来的。
if(line.contains("#")){ String[] arr = line.split("#"); // 添加一行, put = new Put(Bytes.toBytes(arr[0]));
         // 给行添加列 cf column value put.add(Bytes.toBytes(Constant.CF), Bytes.toBytes(Constant.COLUMN), Bytes.toBytes(arr[
1])); puts.add(put); }else{ continue; } lines[userSize] = null; ++userSize; // write when list have 1000 没1000 条提交一次,已经改的 5000 if (userSize % Constant.BATCH ==0){ writeDate(userSize, puts); } } writeDate(userSize, puts); HDFSReadLog.writeLog("analysis " +userSize +" users"); } private static void writeDate(int userSize, List<Put> puts) { try { table.put(puts); HDFSReadLog.writeLog("write "+userSize + " item."); } catch (IOException e) { e.printStackTrace(); HDFSReadLog.writeLog("write "+userSize + " error."); HDFSReadLog.writeLog(e.getMessage()); } } static HTable table = null; // static HTablePool pool = null; static{ try {
        // 创建HTable对象,对应hbase 的table table
= new HTable(HBaseConf.getConf(),Constant.HBASE_TABLE);
        // 如果表不存在就创建一个 fitTable(Constant.HBASE_TABLE); }
catch (IOException e) { e.printStackTrace(); HDFSReadLog.writeLog("create table error."); HDFSReadLog.writeLog(e.getMessage()); } } /** * if table is not exists, create it * @param tab * @throws IOException */ private static void fitTable(String tab) throws IOException { HBaseAdmin admin = new HBaseAdmin(HBaseConf.getConf()); if (admin.tableExists(tab)) { HDFSReadLog.writeLog(tab + " exists"); } else {
         HTableDescriptor tableDesc
= new HTableDescriptor(tab);
        // 建表的使用要指定 column family tableDesc.addFamily(
new HColumnDescriptor("cf")); admin.createTable(tableDesc); HDFSReadLog.writeLog(tab + " create success"); } } }

HBaseConfig(z这个必须,不然会卡在table.put 上面,没有报错,就是卡)

public class HBaseConf {

    public static Configuration conf = null;
    public static Configuration getConf(){
        if (conf == null){
            conf = new Configuration();
            String path  = Constant.getSysEnv("HBASE_HOME") +"/conf/";
            HDFSReadLog.writeLog("Get HBase home : " + path);

            // hbase conf
            conf.setClassLoader(HBaseConf.class.getClassLoader());
            conf.addResource(path + "hbase-default.xml");
            conf.addResource(path + "hbase-site.xml");
            conf = HBaseConfiguration.create(conf);
            HDFSReadLog.writeLog("hbase.zookeeper.quorum : " + conf.get("hbase.zookeeper.quorum"));
        }
    // 如果配置文件读不到,set这两个参数,也可以读
        /*conf.set("hbase.zookeeper.quorum", "ip,ip,ip");
        conf.set("hbase.zookeeper.property.clientPort", "port");*/
        return conf;
    }

}  

 

注: hbase的配置文件很重要,如果读不到 “hbase.zookeeper.quorum” 会默认到 localhost,然后在table.put 的时候,卡住。

table.put(),不止可以put 一个Put,也可以put 一个Put的list,这样算是到底批量提交了。

一个一个写,太慢了。这边的结果:334403 条数据,写了112秒

 

 

 

 

以上是关于Java往hbase写数据的主要内容,如果未能解决你的问题,请参考以下文章

HBase int类型字段存储乱码

hbase预分区

hbase预分区

结合Hbase实时数据分析

基于hbase-spark实现hive到hbase的数据传输中间件

Hbase中对数据 增删改查 工作流程