新版 API 中加入了 Connection,HAdmin成了Admin,HTable成了Table,而Admin和Table只能通过Connection获得。Connection的创建是个重量级的操作,由于Connection是线程安全的,所以推荐使用单例,其工厂方法需要一个HBaseConfiguration。
val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master")
//Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口 val conn = ConnectionFactory.createConnection(conf)
创建表
使用Admin创建和删除表
val userTable = TableName.valueOf("user")
//创建 user 表 val tableDescr = new HTableDescriptor(userTable) tableDescr.addFamily(new HColumnDescriptor("basic".getBytes)) println("Creating table `user`. ") if (admin.tableExists(userTable)) admin.disableTable(userTable) admin.deleteTable(userTable)
try //获取 user 表 val table = conn.getTable(userTable)
try //准备插入一条 key 为 id001 的数据 val p = new Put("id001".getBytes) //为put操作指定 column 和 value (以前的 put.add 方法被弃用了) p.addColumn("basic".getBytes,"name".getBytes, "wuchong".getBytes) //提交 table.put(p)
//查询某条数据 val g = new Get("id001".getBytes) val result = table.get(g) val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) println("GET id001 :"+value)
//扫描数据 val s = new Scan() s.addColumn("basic".getBytes,"name".getBytes) val scanner = table.getScanner(s)
def saveAsHadoopDataset(conf: JobConf): Unit Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system
//指定输出格式和输出表名 val jobConf = new JobConf(conf,this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user")
def convert(triple: (Int, String, Int)) = val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p)
Step 3: 读取RDD并转换
//read RDD data from somewhere and convert val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38)) val localData = sc.parallelize(rawData).map(convert)
val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val count = usersRDD.count() println("Users RDD Count:" + count) usersRDD.cache()
//遍历输出 usersRDD.foreach case (_,result) => val key = Bytes.toInt(result.getRow) val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Age:"+age)