golang实现大数据量文件的排序
Posted jfcat
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang实现大数据量文件的排序相关的知识,希望对你有一定的参考价值。
今天的主题围绕一段go代码展开,这段代码主要实现大数据量的文件排序功能,逻辑并不复杂但是实现过程有些问题做下记录
- 随机数
- 类型转换
- 文件操作
- priorityqueue优先级队列
随机数的实现每种语言都有,go语言自然也不例外,官方提供了 math/rand 和crypto/rand ,从官方介绍来说crypto/rand 更多适合安全敏感的范围,这里就用math/rand就满足了。
随机数
随机数还要满足两个要求
-
产生的结果每个都不一样
-
产生的数能满足一定的大小
少量重复其实没有什么影响,这里只是提出更高要求,满足一定大小,是希望差异不要太大,这点倒不重要,但是可以满足第一条要求
由于golang没有java的stream方式那样流式的处理方式,这里使用map去重处理
seen := make(map[int]bool, BIG_FILE_LINE_NUM)
i := 1
for {
num := rand.Intn(BIG_FILE_LINE_NUM * 10)
if err != nil {
panic(RAND_NUM_ERR)
}
if seen[num] {
continue
}
seen[num] = true
num += BIG_FILE_LINE_NUM
f.WriteString(strconv.Itoa(num) + SEPARATOR)
if i == BIG_FILE_LINE_NUM {
break
}
i++
}
另外,注意rand.Seed让每次产生的rand都不一样,如果不设置默认是rand.Seed(1)
rand.Seed(time.Now().UnixNano())
类型转换
-
整形和字符串互相转换
系统自带了工具包可以解决
var num int = 100
strconv.Itoa(num) //整形转字符串
strconv工具包里面还有很多工具可以使用的,具体看实现能否满足需求
strconv.ParseInt(s string, base int, bitSize int)
-
字节slice和整形
line, err := reader.ReadBytes('\\n')
if err != nil {
break
}
b := string(line)
a, _ := strconv.Atoi(strings.TrimRight(b, "\\n"))
这里使用[]byte转字符串后再转整形,其中还处理了换行符问题。暂时没有发现很好的实现,应该有更合理的实现方式,做个记录有好的实现再改。
文件操作
-
路径操作
filepath := FILE_PATH + BIG_FILENAME
if _, err := os.Stat(filepath); err != nil {
if os.IsNotExist(err) {
os.Mkdir(FILE_PATH, os.ModePerm)
}
}
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
路径操作比较直接,由于我自定义了一层目录,所以直接os.Mkdir()就可以了,如果是多级目录可以使用os.MkdirAll()
这里有些特别的是golang对操作的处理,对错误的情况判断是不同的路径状态,这个需要注意。可以判断几种情况:
os.IsExist(err)
os.IsNoExist(err)
os.IsPermission(err)
os.IsTimeout(err)
-
文件创建
本来使用os.Create来创建文件,后来发现OpenFile更方便
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
fmt.Println(err.Error())
os.Exit(OS_OPEN_EXIT)
}
defer f.Close()
-
文件读写
文件读写就麻烦多了,一般现代语言都提供基于buffer的io操作,golang中使用bufio包来处理。
文件读的实现
f, err := os.Open(filepath)
if err != nil {
panic(FILE_OPEN_ERR)
}
defer f.Close()
reader := bufio.NewReader(f)
//使用make要设置len为0
var lines []int
i := 0
j := 0
for {
line, err := reader.ReadBytes('\\n')
if err != nil {
break
}
b := string(line)
a, _ := strconv.Atoi(strings.TrimRight(b, "\\n"))
lines = append(lines, a)
}
对于写的实现留着其他文章再补上吧。
priorityqueue优先级队列
优先级队列golang已经提供了container/heap实现,但是由于不支持泛型这样的操作,需要每个type对对应的interface实现,官方文档有很好的示例,我这直接借用,能弄明白就行。
heap是小顶堆,需要将less比较里面使用 大于符号,每次pop返回最高优先级的节点。如果有对heap不懂的最好看下数据结构的资料。
// This example demonstrates a priority queue built using the heap interface.
package main
import (
"container/heap"
"fmt"
)
// An Item is something we manage in a priority queue.
type Item struct {
value string // The value of the item; arbitrary.
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return pq[i].priority > pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, value string, priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)
}
// This example creates a PriorityQueue with some items, adds and manipulates an item,
// and then removes the items in priority order.
func main() {
// Some items and their priorities.
items := map[string]int{
"banana": 3, "apple": 2, "pear": 4,
}
// Create a priority queue, put the items in it, and
// establish the priority queue (heap) invariants.
pq := make(PriorityQueue, len(items))
i := 0
for value, priority := range items {
pq[i] = &Item{
value: value,
priority: priority,
index: i,
}
i++
}
heap.Init(&pq)
// Insert a new item and then modify its priority.
item := &Item{
value: "orange",
priority: 1,
}
heap.Push(&pq, item)
pq.update(item, item.value, 5)
// Take the items out; they arrive in decreasing priority order.
for pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
fmt.Printf("%.2d:%s ", item.priority, item.value)
}
}
完整代码如下:
package main
import (
"bufio"
"bytes"
"container/heap"
"encoding/binary"
"fmt"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"time"
)
const (
BIG_FILE_LINE_NUM = 100000
FILE_LINE_NUM = 10000
SEPARATOR string = "\\n"
FILE_PATH string = "./tmp/"
PRE_FILENAME string = "tmp_"
BIG_FILENAME string = "bigfile"
MERGE_FILENAME string = "mergefile"
FILE_OPEN_ERR string = "FILE CREATE ERROR"
OS_OPEN_EXIT = 1
RAND_NUM_ERR string = "rand number creat failed"
RAND_NUM_EXIT = 2
OS_READ_ERR string = "file read err"
OS_READ_EXIT = 3
OS_CREATESMALLFILE_EXIT = 4
)
// An Item is something we manage in a priority queue.
type Item struct {
value *bufio.Reader // The value of the item; arbitrary.
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return pq[i].priority > pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, value *bufio.Reader, priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)
}
//整形转换成字节
func IntToBytes(n int) []byte {
x := int32(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, x)
return bytesBuffer.Bytes()
}
//字节转换成整形
func BytesToInt(b []byte) int {
bytesBuffer := bytes.NewBuffer(b)
var x int
binary.Read(bytesBuffer, binary.BigEndian, &x)
return int(x)
}
type FileSort struct {
files []string
}
func (fs *FileSort) generated() {
filepath := FILE_PATH + BIG_FILENAME
if _, err := os.Stat(filepath); err != nil {
if os.IsNotExist(err) {
os.Mkdir(FILE_PATH, os.ModePerm)
}
}
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
fmt.Println(err.Error())
os.Exit(OS_OPEN_EXIT)
}
defer f.Close()
rand.Seed(time.Now().UnixNano())
// find repeatly number
seen := make(map[int]bool, BIG_FILE_LINE_NUM)
i := 1
for {
num := rand.Intn(BIG_FILE_LINE_NUM * 10)
if err != nil {
panic(RAND_NUM_ERR)
}
if seen[num] {
continue
}
seen[num] = true
num += BIG_FILE_LINE_NUM
f.WriteString(strconv.Itoa(num) + SEPARATOR)
if i == BIG_FILE_LINE_NUM {
break
}
i++
}
}
func (fs *FileSort) generatedSmallFiles() {
filepath := FILE_PATH + BIG_FILENAME
f, err := os.Open(filepath)
if err != nil {
panic(FILE_OPEN_ERR)
}
defer f.Close()
reader := bufio.NewReader(f)
//使用make要设置len为0
var lines []int
i := 0
j := 0
for {
line, err := reader.ReadBytes('\\n')
if err != nil {
break
}
// fmt.Printf("read big file line %s", line)
b := string(line)
a, _ := strconv.Atoi(strings.TrimRight(b, "\\n"))
lines = append(lines, a)
j++
if len(lines) == FILE_LINE_NUM {
smallfilepath := fmt.Sprintf("%s%s%d", FILE_PATH, PRE_FILENAME, i)
fmt.Printf("small file name %s\\n", smallfilepath)
i++
sf, err := os.OpenFile(smallfilepath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
// 理想的错误方法
if err != nil {
fmt.Println(err.Error())
os.Exit(OS_CREATESMALLFILE_EXIT)
}
fs.files = append(fs.files, smallfilepath)
sort.Slice(lines, func(i, j int) bool {
return lines[i] > lines[j]
})
for _, data := range lines {
sf.WriteString(fmt.Sprintf("%d", data) + SEPARATOR)
}
sf.Close()
// slice清空问题
lines = lines[0:0]
}
}
}
func (fs *FileSort) mergeFiles() {
pq := make(PriorityQueue, len(fs.files))
i := 0
for _, filename := range fs.files {
file, _ := os.OpenFile(filename, os.O_RDONLY, os.ModePerm)
defer file.Close()
in := bufio.NewReader(file)
v, _, _ := in.ReadLine()
intv, _ := strconv.Atoi(string(v))
pq[i] = &Item{
value: in,
priority: intv,
index: i,
}
i++
}
heap.Init(&pq)
filepath := FILE_PATH + MERGE_FILENAME
f, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
fmt.Errorf("error when open file %s, return err %s", filepath, err)
os.Exit(OS_OPEN_EXIT)
}
defer f.Close()
for pq.Len() > 0 {
old := heap.Pop(&pq).(*Item)
f.WriteString(strconv.Itoa(old.priority) + SEPARATOR)
fmt.Printf("sort num %d\\n", old.priority)
v, _, err := old.value.ReadLine()
if err != nil {
continue
}
intv, _ := strconv.Atoi(string(v))
item := &Item{
value: old.value,
priority: intv,
}
heap.Push(&pq, item)
pq.update(item, item.value, intv)
}
}
func main() {
fs := new(FileSort)
fmt.Println("Big File starts generating ")
fs.generated()
fmt.Println("Small File starts generating")
fs.generatedSmallFiles()
fmt.Println("Merge Files start")
fs.mergeFiles()
}
引用:
https://www.cnblogs.com/wangqianlove/p/12579994.html
https://ieevee.com/tech/2018/01/29/go-heap.html
https://pkg.go.dev/math/rand#Int63n
以上是关于golang实现大数据量文件的排序的主要内容,如果未能解决你的问题,请参考以下文章