golang 使用Golang的Array类型的PostgreSQL演示

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 使用Golang的Array类型的PostgreSQL演示相关的知识,希望对你有一定的参考价值。

package main

import (
	"database/sql"
	"errors"
	"fmt"
	_ "github.com/bmizerany/pq"
	"os"
	"regexp"
	"strings"
)

func main() {
	db := dbConnect()

	makeTestTables(db)

	defer db.Close()
	defer cleanup(db)

	// Insert Some Data
	db.Exec(`INSERT INTO array_test VALUES ('{"String1", "String2"}')`)

	// arrays can be selected as strings...
	dataString := selectAsString(db)
	fmt.Println("SELECT as String:", dataString)

	// Or by using array functions...
	dataUnnest := selectUsingUnnest(db)
	fmt.Println("SELECT using Unnest:", dataUnnest)

	// Or by defining a scan type and parsing the return value
	dataSlice := selectAsSlice(db)
	fmt.Println("SELECT by parsing:", dataSlice)

	// Arrays can be updated by replacing the entire array:
	newArray := []interface{}{"String1", "String3", "String4", "String5"}
	updateArray(db, newArray)
	dataSlice = selectAsSlice(db)
	fmt.Println("UPDATE entire array", dataSlice)

	// or by appending / prepending value(s):
	AppendToArray(db, "String6")
	dataSlice = selectAsSlice(db)
	fmt.Println("UPDATE with append:", dataSlice)

	// or by replacing individual values:
	ReplaceInArray(db, 2, "NULL")
	dataSlice = selectAsSlice(db)
	fmt.Println("UPDATE with replace:", dataSlice)

	// Deleting by index requires slicing and is inefficient:
	DeleteFromArray(db, 3)
	dataSlice = selectAsSlice(db)
	fmt.Println("UPDATE deleting index:", dataSlice)

}

// Arrays are serialized to strings {value, value...} by the database.
// these strings selected, updated and inserted like any string
func selectAsString(db *sql.DB) string {
	row := db.QueryRow("SELECT data FROM array_test")
	var asString string
	err := row.Scan(&asString)
	if err != nil {
		panic(err)
	}
	return asString
}

// The UNNEST function expands an array into multiple rows.  Each row
// can then be scanned individually.
func selectUsingUnnest(db *sql.DB) []string {
	results := make([]string, 0)
	rows, err := db.Query("SELECT UNNEST(data) FROM array_test")
	if err != nil {
		panic(err)
	}
	var scanString string
	for rows.Next() {
		rows.Scan(&scanString)
		results = append(results, scanString)
	}
	return results
}

// By defining a wrapper type around a slice which implements
// sql.Scanner, we can scan the array directly into the type.
func selectAsSlice(db *sql.DB) StringSlice {
	row := db.QueryRow("SELECT data FROM array_test")
	var asSlice StringSlice
	err := row.Scan(&asSlice)
	if err != nil {
		panic(err)
	}
	return asSlice
}

// Update an array by replacing the whole array with new values.
// This _could_ be done by serializing the StringSlice type using
// sql.driver.Valuer, but then we would have to validate the type
// of each value manually and format it for insert by hand.  Instead,
// the ARRAY[...] format allows us to use query parameters to construct
// the array, ie ARRAY[$1, $2, $3], which then allows the database
// driver to coerce the variables into the right format for us.
func updateArray(db *sql.DB, array []interface{}) {
	params := make([]string, 0, len(array))
	for i := range array {
		params = append(params, fmt.Sprintf("$%v", i+1))
	}
	query := fmt.Sprintf("UPDATE array_test SET data = ARRAY[%s]", strings.Join(params, ", "))
	db.Exec(query, array...)
}

// The ARRAY_APPEND and ARRAY_PREPEND functions can be used to add single
// values to arrays.  ARRAY_CAT combines two arrays.  The || operator can
// do the same thing:
//   SET data = data || <value>
//   SET data = data || ARRAY[<value1>, <value2>]

func AppendToArray(db *sql.DB, value string) {
	_, err := db.Exec("UPDATE array_test SET data = ARRAY_APPEND(data, $1)", value)
	if err != nil {
		panic(err)
	}
}

// Arrays are 1-indexed. Individual elements can be used in expressions,
// updated, or selected by indexing the array.
func ReplaceInArray(db *sql.DB, index int, newValue string) {
	_, err := db.Exec("UPDATE array_test SET data[$1] = $2", index, newValue)
	if err != nil {
		panic(err)
	}
}

// Arrays support slice indexing:
//    ARRAY['a', 'b', 'c'][1:2] == ARRAY['a', 'b']
// The ARRAY_UPPER function gets the length of an array for a specified dimension
// Deleting a value from an array amounts to slicing the array into two parts
// and combining them back together.
func DeleteFromArray(db *sql.DB, i int) {
	_, err := db.Exec("UPDATE array_test SET data = array_cat(data[0:$1], data[$2:ARRAY_UPPER(data, 1) + 1])", i-1, i+1)
	if err != nil {
		panic(err)
	}
}

type StringSlice []string

// Implements sql.Scanner for the String slice type
// Scanners take the database value (in this case as a byte slice)
// and sets the value of the type.  Here we cast to a string and
// do a regexp based parse
func (s *StringSlice) Scan(src interface{}) error {
	asBytes, ok := src.([]byte)
	if !ok {
		return error(errors.New("Scan source was not []bytes"))
	}

	asString := string(asBytes)
	parsed := parseArray(asString)
	(*s) = StringSlice(parsed)

	return nil
}

// PARSING ARRAYS
// SEE http://www.postgresql.org/docs/9.1/static/arrays.html#ARRAYS-IO
// Arrays are output within {} and a delimiter, which is a comma for most
// postgres types (; for box)
//
// Individual values are surrounded by quotes:
// The array output routine will put double quotes around element values if
// they are empty strings, contain curly braces, delimiter characters,
// double quotes, backslashes, or white space, or match the word NULL.
// Double quotes and backslashes embedded in element values will be
// backslash-escaped. For numeric data types it is safe to assume that double
// quotes will never appear, but for textual data types one should be prepared
// to cope with either the presence or absence of quotes.

// construct a regexp to extract values:
var (
	// unquoted array values must not contain: (" , \ { } whitespace NULL)
	// and must be at least one char
	unquotedChar  = `[^",\\{}\s(NULL)]`
	unquotedValue = fmt.Sprintf("(%s)+", unquotedChar)

	// quoted array values are surrounded by double quotes, can be any
	// character except " or \, which must be backslash escaped:
	quotedChar  = `[^"\\]|\\"|\\\\`
	quotedValue = fmt.Sprintf("\"(%s)*\"", quotedChar)

	// an array value may be either quoted or unquoted:
	arrayValue = fmt.Sprintf("(?P<value>(%s|%s))", unquotedValue, quotedValue)

	// Array values are separated with a comma IF there is more than one value:
	arrayExp = regexp.MustCompile(fmt.Sprintf("((%s)(,)?)", arrayValue))

	valueIndex int
)

// Find the index of the 'value' named expression
func init() {
	for i, subexp := range arrayExp.SubexpNames() {
		if subexp == "value" {
			valueIndex = i
			break
		}
	}
}

// Parse the output string from the array type.
// Regex used: (((?P<value>(([^",\\{}\s(NULL)])+|"([^"\\]|\\"|\\\\)*")))(,)?)
func parseArray(array string) []string {
	results := make([]string, 0)
	matches := arrayExp.FindAllStringSubmatch(array, -1)
	for _, match := range matches {
		s := match[valueIndex]
		// the string _might_ be wrapped in quotes, so trim them:
		s = strings.Trim(s, "\"")
		results = append(results, s)
	}
	return results
}

// DB HELPERs

func dbConnect() *sql.DB {
	datname := os.Getenv("PGDATABASE")
	sslmode := os.Getenv("PGSSLMODE")

	if datname == "" {
		os.Setenv("PGDATABASE", "pqgotest")
	}

	if sslmode == "" {
		os.Setenv("PGSSLMODE", "disable")
	}

	conn, err := sql.Open("postgres", "")
	if err != nil {
		panic(err)
	}

	return conn
}

// Create a table with an array type
// Can also use the syntax CREATE TABLE array_test (data varchar ARRAY)
func makeTestTables(db *sql.DB) {
	_, err := db.Exec("CREATE TABLE array_test (data varchar[])")
	if err != nil {
		panic(err)
	}
}

func cleanup(db *sql.DB) {
	db.Exec("DROP TABLE array_test")
}

Golang后台开发初体验

转自:http://blog.csdn.net/cszhouwei/article/details/37740277

补充反馈

slice

 

既然聊到slice,就不得不提它的近亲array,这里不太想提值类型和引用类型的概念(个人觉得其实都是值类型),golang的array其实可以假想为C的struct类型,只是struct通过变量名来访问成员(如xxx.yyy),而array通过下标来访问成员(如xxx[3]),具体内存布局如下图所示:

技术分享

 

图 1 golang的array内存布局

       显然golang的array灵活性比较差,长度固定,这才有了slice,概念上有点类似于STL的vector,但是具体实现上还是有差距的,具体内存布局如下图所示:

技术分享

 

图 2 golang的slice内存布局

       slice类型存在len和cap的概念(与vector类似),这里有一点需要澄清:与vector不一样,slice的len并不能无限增长,cap就是它的天花板。比如s := make([]byte, 3, 5),后续不管s如何增长,它的len也不能超过5。

       s:= make([]byte, 5)

       s= s[2:4]

技术分享

 

图 3 s[2:4]内存布局

从上图不难看出,其实slice操作并没有任何内存拷贝动作,仅仅是生成一份新的描述数据(len=2 cap=3);此时,如果执行s = s[:cap(s)],可以将s的len扩张到最大,如下图所示:

技术分享

 

图 4 s[:cap(s)]内存布局

       在golang里面,如果slice需要扩张到超出cap,只能创建新的slice,然后将现有数据copy过去,再指向新的slice,一般可以借助内置的append函数。

       顺带一提,由于slice操作之后,新的对象存在指针指向真实的数据块内存,所以某些场景下,可能会导致大块内存无法被GC回收。

performance

不少tx都提到了深深的性能担忧,其实我本来并不喜欢过于纠结性能问题,毕竟追求极致的单机性能往往意义不大,不过既然提到了,我也上网找了点数据,供有兴趣的读者参考:

技术分享

 

图 5 Go vs C (x64 quad-core)

技术分享

 

图 6 go vs Java (x64 quad-core)

技术分享

 

图 7 Go vs PHP (x64 quad-core)

       这里仅列出golang和c、java、php的简单对比,详细的代码和数据大家可以登录http://benchmarksgame.alioth.debian.org/自行查看

garbage collection

(待续)

 

—————————————————————————————————————————————————————————————————

前言

犹记得去年靠着微信后台的强势宣传,coroutine在我司的C/C++后台界着实火了一把,当时我也顺势对中心的后台网络框架做了coroutine化改造,详见《当C/C++后台开发遇上Coroutine》,当时在文末我也提到了该实现的一些局限性,包括但不限于:

1.       所有的coroutine运行于同一线程空间,无法真正发挥CPU的多核性能

2.       非抢占式调度模式,单个coroutine的阻塞将导致整个server失去响应

与此同时,出身名门的Golang在国内技术圈已经声名鹊起,不乏许世伟等圈内大牛鼓吹其设计之优雅、简洁。本文并不会展开叙述Golang的语言细节,有兴趣的读者可以参阅官方文档(http://www.golang.org),自备梯子,你懂的!

并发与分布式

后台开发的嘴里,估计重复最多的字眼就是“并发”“分布式”云云,那么自我定位“互联网时代的C语言”的Golang又是如何处理的呢?

1.       并发执行的“执行体”:进程、线程、协程 …

多数语言在语法层面并不直接支持coroutine,而通过库的方式支持,正如上文所言,如果在这样的coroutine中调用同步IO操作,比如网络通信、文件读写,都会阻塞其它的并发执行coroutine。Golang在语言级别支持coroutine(goroutine),golang标准库提供的所有系统调用(包括同步IO操作),都会主动出让CPU给其它的goroutine,cool!

2.       执行体间的“通信”:同步/互斥、消息传递 …

并发编程模型主要有两个流派:“共享内存”和“消息传递”,我司不用说,显然是“共享内存”模型的铁杆粉丝。Erlang属于“消息传递”模型的代表,“消息乃进程间通信的唯一方式”。Golang同时支持这两种模型,但是推荐使用后者,即goroutine之间通过channel进行交互。Golang圈内流行一句话:“Don‘t communicate by sharing memory; share memory by communicating.”,大家感受一下。

综上所述,Golang的并发编程可以简单表述为:concurrency = goroutine + channel。关于并发,这里多提一句,“并发”不等于“并行”,这一点对于理解goroutine的并发执行还是挺关键的,推荐阅读《Concurrencyis not parallelism》。

业务场景

其实绝大多数后台Server的业务场景非常简单,基本可以描述为:某逻辑层Server收到前端请求REQ后,需要综合其它N个Server的信息,此时该Server有两种选择:

1.      串行处理

1)    Send request to ServerX andwait for response from ServerX

2)    Send request to ServerY andwait for response from ServerY

3)    Send request to ServerZ andwait for response from ServerZ

4)    Send response to Client

2.      并行处理

1)    Send request to ServerX、ServerY、ServerZ allat once

2)    Wait for all responses and sendresponse to Client

技术分享

注:为了简化后续讨论,这里我们假设所有前端请求之间相互独立,Per-Request-Per-Goroutine,不考虑Goroutine之间的复杂交互。

 

代码示例

朴素思路

Per-Request-Per-Goroutine,对于写惯异步Server的苦逼开发,想想都令人激动,大脑再也不用频繁切换于各种上下文,再也不用纠结复杂的状态机跳转,一切都显得如此自然。

 

package main  
  
import (  
    "fmt"  
    "net"  
    "os"  
)  
  
func main() {  
    addr, err := net.ResolveUDPAddr("udp", ":6000")  
    if err != nil {  
        fmt.Println("net.ResolveUDPAddr fail.", err)  
        os.Exit(1)  
    }  
  
    conn, err := net.ListenUDP("udp", addr)  
    if err != nil {  
        fmt.Println("net.ListenUDP fail.", err)  
        os.Exit(1)  
    }  
    defer conn.Close()  
  
    for {  
        buf := make([]byte, 65535)  
        rlen, remote, err := conn.ReadFromUDP(buf)  
        if err != nil {  
            fmt.Println("conn.ReadFromUDP fail.", err)  
            continue  
        }  
        go handleConnection(conn, remote, buf[:rlen])  
    }  
}  
  
func handleConnection(conn *net.UDPConn, remote *net.UDPAddr, msg []byte) {  
    service_addr, err := net.ResolveUDPAddr("udp", ":6001")  
    if err != nil {  
        fmt.Println("net.ResolveUDPAddr fail.", err)  
        return  
    }  
  
    service_conn, err := net.DialUDP("udp", nil, service_addr)  
    if err != nil {  
        fmt.Println("net.DialUDP fail.", err)  
        return  
    }  
    defer service_conn.Close()  
  
    _, err = service_conn.Write([]byte("request servcie x"))  
    if err != nil {  
        fmt.Println("service_conn.Write fail.", err)  
        return  
    }  
  
    buf := make([]byte, 65535)  
    rlen, err := service_conn.Read(buf)  
    if err != nil {  
        fmt.Println("service_conn.Read fail.", err)  
        return  
    }  
  
    conn.WriteToUDP(buf[:rlen], remote)  
}

       其实这个最朴素思路下的Server在绝大多数情况下都可以正常工作,而且运行良好,但是不难看出存在以下问题:

1.       延时(Latency):Server与后端Service之间采用短链接通信,对于UDP类无连接方式影响不大,但是对于TCP类有连接方式,开销还是比较客观的,增加了请求的响应延时

2.       并发(Concurrency):16位的端口号数量有限,如果每次后端交互都需要新建连接,理论上来说,同时请求后端Service的Goroutine数量无法超过65535这个硬性限制,在如今这个动辄“十万”“百万”高并发时代,最高6w并发貌似不太拿得出手

改进思路

       使用过多线程并发模型的tx应该已经注意到,这两个问题在多线程模型中同样存在,只是不如golang如此突出:创建的线程数量一般是受控的,不会达到端口上限,但是goer显然不能满足于这个量级的并发度。

       解决方法也很简单,既然短连接存在诸多弊端,使用长连接呗。那我们该如何利用golang提供的语言设施来具体实现呢?既然通信连接比较棘手,干脆抽取出独立的通信代理(conn-proxy),代理本身处理所有的网络通信细节(连接管理,数据收发等),具体的process-goroutine通过channel与communication-proxy进行交互(提交请求,等待响应等),如下图所示:

技术分享

 

package main  
  
import (  
    "fmt"  
    "net"  
    "os"  
    "strconv"  
    "time"  
)  
  
type Request struct {  
    isCancel bool  
    reqSeq   int  
    reqPkg   []byte  
    rspChan  chan<- []byte  
}  
  
func main() {  
    addr, err := net.ResolveUDPAddr("udp", ":6000")  
    if err != nil {  
        fmt.Println("net.ResolveUDPAddr fail.", err)  
        os.Exit(1)  
    }  
  
    conn, err := net.ListenUDP("udp", addr)  
    if err != nil {  
        fmt.Println("net.ListenUDP fail.", err)  
        os.Exit(1)  
    }  
    defer conn.Close()  
  
    reqChan := make(chan *Request, 1000)  
    go connHandler(reqChan)  
  
    var seq int = 0  
    for {  
        buf := make([]byte, 1024)  
        rlen, remote, err := conn.ReadFromUDP(buf)  
        if err != nil {  
            fmt.Println("conn.ReadFromUDP fail.", err)  
            continue  
        }  
        seq++  
        go processHandler(conn, remote, buf[:rlen], reqChan, seq)  
    }  
}  
  
func processHandler(conn *net.UDPConn, remote *net.UDPAddr, msg []byte, reqChan chan<- *Request, seq int) {  
    rspChan := make(chan []byte, 1)  
    reqChan <- &Request{false, seq, []byte(strconv.Itoa(seq)), rspChan}  
    select {  
    case rsp := <-rspChan:  
        fmt.Println("recv rsp. rsp=%v", string(rsp))  
    case <-time.After(2 * time.Second):  
        fmt.Println("wait for rsp timeout.")  
        reqChan <- &Request{isCancel: true, reqSeq: seq}  
        conn.WriteToUDP([]byte("wait for rsp timeout."), remote)  
        return  
    }  
  
    conn.WriteToUDP([]byte("all process succ."), remote)  
}  
  
func connHandler(reqChan <-chan *Request) {  
    addr, err := net.ResolveUDPAddr("udp", ":6001")  
    if err != nil {  
        fmt.Println("net.ResolveUDPAddr fail.", err)  
        os.Exit(1)  
    }  
  
    conn, err := net.DialUDP("udp", nil, addr)  
    if err != nil {  
        fmt.Println("net.DialUDP fail.", err)  
        os.Exit(1)  
    }  
    defer conn.Close()  
  
    sendChan := make(chan []byte, 1000)  
    go sendHandler(conn, sendChan)  
  
    recvChan := make(chan []byte, 1000)  
    go recvHandler(conn, recvChan)  
  
    reqMap := make(map[int]*Request)  
    for {  
        select {  
        case req := <-reqChan:  
            if req.isCancel {  
                delete(reqMap, req.reqSeq)  
                fmt.Println("CancelRequest recv. reqSeq=%v", req.reqSeq)  
                continue  
            }  
            reqMap[req.reqSeq] = req  
            sendChan <- req.reqPkg  
            fmt.Println("NormalRequest recv. reqSeq=%d reqPkg=%s", req.reqSeq, string(req.reqPkg))  
        case rsp := <-recvChan:  
            seq, err := strconv.Atoi(string(rsp))  
            if err != nil {  
                fmt.Println("strconv.Atoi fail. err=%v", err)  
                continue  
            }  
            req, ok := reqMap[seq]  
            if !ok {  
                fmt.Println("seq not found. seq=%v", seq)  
                continue  
            }  
            req.rspChan <- rsp  
            fmt.Println("send rsp to client. rsp=%v", string(rsp))  
            delete(reqMap, req.reqSeq)  
        }  
    }  
}  
  
func sendHandler(conn *net.UDPConn, sendChan <-chan []byte) {  
    for data := range sendChan {  
        wlen, err := conn.Write(data)  
        if err != nil || wlen != len(data) {  
            fmt.Println("conn.Write fail.", err)  
            continue  
        }  
        fmt.Println("conn.Write succ. data=%v", string(data))  
    }  
}  
  
func recvHandler(conn *net.UDPConn, recvChan chan<- []byte) {  
    for {  
        buf := make([]byte, 1024)  
        rlen, err := conn.Read(buf)  
        if err != nil || rlen <= 0 {  
            fmt.Println(err)  
            continue  
        }  
        fmt.Println("conn.Read succ. data=%v", string(buf))  
        recvChan <- buf[:rlen]  
    }  
}  

继续进化

       上述版本的Communication-Proxy只能算toy实现,实际生产环境中,后端Service往往会提供一些独特的接入方式(如我司的CMLB、L5、多IP等),此时,Communication-Proxy需要实现诸如“负载均衡”“容灾切换”等功能,涉及具体接入场景,这里不再一一赘述。通过上面的例子,相信大家很容易借助goroutine+channel进行相应建模。

小结

       本文对于golang如何实现一般后台业务Server进行了简单介绍,基于goroutine和channel实现了toy_server,之所以将其定位于toy,主要是很多看似繁琐但是不容忽视的很多点本文并未涵盖:配置读取、信号处理、日志记录等,这些就留给有心的读者继续探索了!

以上是关于golang 使用Golang的Array类型的PostgreSQL演示的主要内容,如果未能解决你的问题,请参考以下文章

golang之array

golang之 Array(数组)

Golang基础_03-数组Array

Golang后台开发初体验

Golang Array and Slice

golang基础--Array数组