Golang+Python Hbase Thrift1 基本使用
Posted Time-Traveler
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang+Python Hbase Thrift1 基本使用相关的知识,希望对你有一定的参考价值。
Hbase简介:
HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。Thrift
Hbase是目前比较火的列存储数据库,由于Hbase是用Java写的,因此它原生地提供了Java接口,对非Java程序人员,怎么办呢?幸好它提供了thrift接口服务器,因此也可以采用其他语言来编写Hbase的客户端,目前的Hbase有两套thrift接口(可以叫thrift和thrift2),它们并不兼容。根据官方文档,thrift很可能被抛弃,目前网上介绍thrift2的比较多,很容易找到阿里的demo, thrift2传送门,所以这里记录一下thrift1的基本使用, thrift1传送门。启动thrift服务:
hbase的bin目录下存在一个脚本:hbase-daemon.sh
运行命令即可(启动thrift2则将thrift改成thrift2即可):
hbase-daemon.sh start thrift
接下来就是golang的基本操作。
Go基本使用:
package main
import (
"encoding/json"
"fmt"
"github.com/sdming/goh"
"github.com/sdming/goh/Hbase"
)
var address = "xxx.xxx.xxx.xxx:9090"
var tableName = "student_table"
var rowKey = "person_1"
var client *goh.HClient
var columns = []string{"info:name"}
var rows = [][]byte{[]byte(rowKey), []byte(rowKey)}
func CreateTable() (err error) {
fmt.Println("start to create table:")
cols := make([]*goh.ColumnDescriptor, 3)
cols[0] = goh.NewColumnDescriptorDefault("info")
cols[1] = goh.NewColumnDescriptorDefault("education")
cols[2] = goh.NewColumnDescriptorDefault("family")
if exist, err := client.CreateTable(tableName, cols); err != nil {
fmt.Println(err)
} else {
fmt.Println("exist status is:", exist)
}
// fmt.Print("DisableTable:")
// fmt.Println(client.DisableTable("test_create"))
//
// fmt.Print("DeleteTable:")
// fmt.Println(client.DeleteTable("test_create"))
//
return
}
func Get() {
fmt.Println("Get:")
if data, err := client.Get(tableName, []byte(rowKey), "info:name", nil); err != nil {
fmt.Println(err)
} else {
PrintCells(data)
}
if data, err := client.GetVer(tableName, []byte(rowKey), "info:name", 10, nil); err != nil {
fmt.Println(err)
} else {
PrintCells(data)
}
}
func DeleteData() {
fmt.Print("DeleteAll:")
fmt.Println(client.DeleteAll(tableName, []byte(rowKey), "info:name", nil))
//fmt.Print("DeleteAllTs:")
//fmt.Println(client.DeleteAllTs(tableName, []byte(rowKey), "info:name", timestamp, nil))
fmt.Print("DeleteAllRow:")
fmt.Println(client.DeleteAllRow(tableName, []byte(rowKey), nil))
//fmt.Print("Increment:")
//fmt.Println(client.Increment(goh.NewTIncrement(tableName, []byte(rowKey), "info:age", 64)))
//fmt.Print("IncrementRows:")
//fmt.Println(client.IncrementRows([]*Hbase.TIncrement{goh.NewTIncrement(tableName, []byte(rowKey), "info:age", 64)}))
//fmt.Print("DeleteAllRowTs:")
//fmt.Println(client.DeleteAllRowTs(tableName, []byte(rowKey), timestamp, nil))
}
func ScanData() (err error) {
var scanId int32
scan := &goh.TScan{
StartRow: []byte(rowKey),
StopRow: []byte(rowKey),
Timestamp: 0,
Columns: []string{"info:name"},
Caching: 10,
FilterString: "",
//FilterString: "substring:value",
}
if scanId, err = client.ScannerOpenWithScan(tableName, scan, nil); err != nil {
fmt.Println(err)
return
} else {
fmt.Println("ScannerOpenWithScan scanId is:", scanId)
}
GetDataByScan(scanId)
if scanId, err = client.ScannerOpen(tableName, []byte(rowKey), columns, nil); err != nil {
fmt.Println(err)
return
} else {
fmt.Println("ScannerOpen scanId is:", scanId)
}
GetDataByScan(scanId)
if scanId, err = client.ScannerOpenWithStop(tableName, []byte(rowKey), []byte(rowKey), columns, nil); err != nil {
fmt.Println(err)
return
} else {
fmt.Println("ScannerOpenWithStop scanId is:", scanId)
}
GetDataByScan(scanId)
if scanId, err = client.ScannerOpenWithPrefix(tableName, []byte("per"), columns, nil); err != nil {
fmt.Println(err)
return
} else {
fmt.Println("ScannerOpenWithPrefix scanId is:", scanId)
}
GetDataByScan(scanId)
return
}
func GetDataByScan(scanId int32) {
if scanId > 0 {
fmt.Println("scan start")
for {
if data, err := client.ScannerGet(scanId); err != nil {
fmt.Println(err)
break
} else if len(data) == 0 {
fmt.Println("scan end")
break
} else {
PrintRows(data)
}
}
fmt.Println("ScannerGetList")
if data, err := client.ScannerGetList(scanId, 1000); err != nil {
fmt.Println(err)
} else {
PrintRows(data)
}
}
if scanId > 0 {
client.ScannerClose(scanId)
}
}
func GetRowData() {
fmt.Println("GetRow:")
if data, err := client.GetRow(tableName, []byte(rowKey), nil); err != nil {
fmt.Println(err)
} else {
PrintRows(data)
}
fmt.Println("GetRowWithColumns:") // 获取指定的column的数据
if data, err := client.GetRowWithColumns(tableName, []byte(rowKey), columns, nil); err != nil {
fmt.Println(err)
} else {
PrintRows(data)
}
fmt.Println("GetRows:")
if data, err := client.GetRows(tableName, rows, nil); err != nil {
fmt.Println(err)
} else {
PrintRows(data)
}
fmt.Println("GetRowsWithColumns:")
if data, err := client.GetRowsWithColumns(tableName, rows, columns, nil); err != nil {
fmt.Println(err)
} else {
PrintRows(data)
}
fmt.Println("GetRowOrBefore:")
if data, err := client.GetRowOrBefore(tableName, rowKey, "info"); err != nil {
fmt.Println(err)
} else {
PrintCells(data)
}
}
func GetTableInfo() (err error) {
fmt.Println("get table names:")
tableNames, err := client.GetTableNames()
if err != nil {
fmt.Println(err)
return
} else {
Dump(tableNames)
}
fmt.Println("GetColumnDescriptors:")
columnInfos, err := client.GetColumnDescriptors(tableName)
if err != nil {
fmt.Println(err)
return
} else {
Dump(columnInfos)
}
fmt.Println("GetTableRegions:")
regionInfo, err := client.GetTableRegions(tableName)
if err != nil {
fmt.Println(err)
return
} else {
Dump(regionInfo)
}
return
}
func SaveData() (err error) {
infoColumnName := "info"
var mutations []*Hbase.Mutation
var rowBatches []*Hbase.BatchMutation
attributes := make(map[string]string)
attributes["name"] = "zhang san"
attributes["age"] = "25"
for k, v := range attributes {
columnName := fmt.Sprintf("%s:%s", infoColumnName, k)
mutation := goh.NewMutation(columnName, []byte(v))
mutations = append(mutations, mutation)
}
fmt.Printf("mutations are:%#v", mutations)
rowBatches = append(rowBatches, goh.NewBatchMutation([]byte(rowKey), mutations))
err = client.MutateRows(tableName, rowBatches, nil)
if err != nil {
fmt.Println("save data error:", err.Error())
return
}
return
}
func BasicInfo() {
fmt.Print("IsTableEnabled:")
fmt.Println(client.IsTableEnabled(tableName))
fmt.Print("DisableTable:")
fmt.Println(client.DisableTable(tableName))
fmt.Print("IsTableEnabled:")
fmt.Println(client.IsTableEnabled(tableName))
fmt.Print("EnableTable:")
fmt.Println(client.EnableTable(tableName))
fmt.Print("IsTableEnabled:")
fmt.Println(client.IsTableEnabled(tableName))
}
func init() {
var err error
client, err = goh.NewTcpClient(address, goh.TBinaryProtocol, false)
if err != nil {
panic(err)
}
err = client.Open()
if err != nil {
panic(err)
}
}
func main() {
defer client.Close()
//err := CreateTable()
//if err!=nil{
// fmt.Println("error is:",err.Error())
// return
//}
//
//err = SaveData()
//if err!=nil{
// fmt.Println("error is:",err.Error())
// return
//}
//GetRowData()
//ScanData()
GetTableInfo()
fmt.Println("done")
}
func Dump(data interface{}) {
b, err := json.Marshal(data)
if err != nil {
fmt.Println("json marshal error:", err)
return
}
fmt.Println("dump:", string(b))
}
func PrintCells(data []*Hbase.TCell) {
if data == nil {
fmt.Println("<nil>")
}
l := len(data)
fmt.Println("[]*Hbase.TCell len:", l)
for i, x := range data {
fmt.Println(i, ":", string(x.Value), ";", x.Timestamp)
}
}
func PrintRows(data []*Hbase.TRowResult) {
if data == nil {
fmt.Println("<nil>")
}
l := len(data)
fmt.Println("[]*Hbase.TRowResult len:", l)
for i, x := range data {
fmt.Println(i, string(x.Row), "\\n[")
for k, v := range x.Columns {
fmt.Println("\\t", k, ":", string(v.Value), v.Timestamp)
}
fmt.Println("]")
}
}
Python 基本操作:
python 采用了happybase,需要先安装一下:pip install happybase
pip install thrift
import happybase
client = None
address = 'xxx.xxx.xxx.xxx'
table_name = 'python_table'
row_key = 'row_key_one'
def init_client():
global client
client = happybase.Connection(address)
if not client:
print('failed to connect to server')
return
print('tables are: {}'.format(client.tables()))
def create_table():
client.create_table(table_name,{
'info': dict(max_versions=10),
'education': dict(max_versions=1, block_cache_enabled=False),
'family': dict(), # use defaults
})
print('tables are: {}'.format(client.tables()))
def save_data():
table = client.table(table_name)
info_data = {'info:name': 'wang wu', 'info:age': '20'}
education_data = {'education:level': 'primary school', 'education:name': 'hope school'}
family_data = {'family:father': 'wang si', 'family:mother': 'lily'}
table.put(row=row_key, data=info_data)
table.put(row=row_key, data=education_data)
table.put(row=row_key, data=family_data) # 使用put一次只能存储一行数据 如果row key已经存在,则变成了修改数据
def save_batch_data():
table = client.table(table_name)
# batch = table.batch()
# bat.put(row_key', {'info:name': 'wang wu', 'info:age': '20'})
# bat.send() # 显示地发送
# batch将数据保存在内存中,知道数据被send,第一种send数据的方法是显示地发送,即bat.send(),第二种send数据的方法是到达with上下文管理器的结尾自动发送。
# 这样就存在一个问题,万一数据量很大ÿ以上是关于Golang+Python Hbase Thrift1 基本使用的主要内容,如果未能解决你的问题,请参考以下文章