database/sql rows.scan 在 350K 行后挂起
Posted
技术标签:
【中文标题】database/sql rows.scan 在 350K 行后挂起【英文标题】:database/sql rows.scan hangs after 350K rows 【发布时间】:2020-08-20 21:55:37 【问题描述】:我的任务是从 Oracle 数据库中提取数据,我正在尝试提取大量数据 > 6MM 记录,其中包含 100 列进行处理。
需要将数据转换为地图。我成功地在不到 35 秒的时间内处理了 35 万条记录。之后,服务器挂起并且不再继续。 有没有办法可以根据行大小对它们进行批处理或批处理以释放我的空间。
func FetchUsingGenericResult(ctx context.Context, dsConnection *string, sqlStatement string) (*entity.GenericResultCollector, error)
columnTypes := make(map[string]string)
var resultCollection entity.GenericResultCollector
db, err := sql.Open("godror", *dsConnection)
if err != nil
return &resultCollection, errors.Wrap(err, "error connecting to Oracle")
log := logger.FromContext(ctx).Sugar()
log.Infof("start querying from Oracle at :%v", time.Now())
rows, err := db.Query(sqlStatement, godror.FetchRowCount(defaultFetchCount))
if err != nil
return &resultCollection, errors.Wrap(err, "error querying")
objects, err := rows2Strings(ctx, rows)
log.Infof("total Rows converted are :%v by %v", len(*objects), time.Now())
resultCollection = entity.GenericResultCollector
Columns: columnTypes,
Rows: objects,
return &resultCollection, nil
func rows2Strings(ctx context.Context, rows *sql.Rows) (*[]map[string]string, error)
result := make(map[string]string)
resultsSlice := []map[string]string
fields, err := rows.Columns()
if err != nil
return nil, err
log := logger.FromContext(ctx).Sugar()
waitGroup, ctx := errgroup.WithContext(ctx)
counter := 0
for rows.Next()
counter++
if counter%defaultFetchCount == 0
log.Infof("finished converting %v rows by %v", counter, time.Now())
waitGroup.Go(func() error
result, err = row2mapStr(rows, fields)
if err != nil
return err
resultsSlice = append(resultsSlice, result)
return nil
)
if err := waitGroup.Wait(); err != nil
return nil, err
return &resultsSlice, nil
func row2mapStr(rows *sql.Rows, fields []string) (resultsMap map[string]string, err error)
result := make(map[string]string)
scanResultContainers := make([]interface, len(fields))
for i := 0; i < len(fields); i++
var scanResultContainer interface
scanResultContainers[i] = &scanResultContainer
if err := rows.Scan(scanResultContainers...); err != nil
return nil, err
for ii, key := range fields
rawValue := reflect.Indirect(reflect.ValueOf(scanResultContainers[ii]))
// if row is null then as empty string
if rawValue.Interface() == nil
result[key] = ""
continue
if data, err := value2String(&rawValue); err == nil
result[key] = data
else
return nil, err
return result, nil
func value2String(rawValue *reflect.Value) (str string, err error)
aa := reflect.TypeOf((*rawValue).Interface())
vv := reflect.ValueOf((*rawValue).Interface())
switch aa.Kind()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
str = strconv.FormatInt(vv.Int(), 10)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
str = strconv.FormatUint(vv.Uint(), 10)
case reflect.Float32, reflect.Float64:
str = strconv.FormatFloat(vv.Float(), 'f', -1, 64)
case reflect.String:
str = vv.String()
case reflect.Array, reflect.Slice:
switch aa.Elem().Kind()
case reflect.Uint8:
data := rawValue.Interface().([]byte)
str = string(data)
if str == "\x00"
str = "0"
default:
err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
// time type
case reflect.Struct:
if aa.ConvertibleTo(timeType)
str = vv.Convert(timeType).Interface().(time.Time).Format(time.RFC3339Nano)
else
err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
case reflect.Bool:
str = strconv.FormatBool(vv.Bool())
case reflect.Complex128, reflect.Complex64:
str = fmt.Sprintf("%v", vv.Complex())
default:
err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
return
有没有人遇到过类似的问题?
修改代码如下:
func FetchUsingGenericResult(ctx context.Context, dsConnection *string, sqlStatement string) (*entity.GenericResultCollector, error)
columnTypes := make(map[string]string)
var resultCollection entity.GenericResultCollector
db, err := sql.Open("godror", *dsConnection)
if err != nil
return &resultCollection, errors.Wrap(err, "error connecting to Oracle")
log := logger.FromContext(ctx).Sugar()
log.Infof("start querying from Oracle at :%v", time.Now())
rows, err := db.Query(sqlStatement, godror.FetchRowCount(defaultFetchCount))
if err != nil
return &resultCollection, errors.Wrap(err, "error querying")
objects, err := rows2Strings(ctx, rows)
log.Infof("total Rows converted are :%v by %v", len(*objects), time.Now())
resultCollection = entity.GenericResultCollector
Columns: columnTypes,
Rows: objects,
return &resultCollection, nil
func rows2Strings(ctx context.Context, rows *sql.Rows) (*[]map[string]string, error)
result := make(map[string]string)
resultsSlice := []map[string]string
fields, err := rows.Columns()
if err != nil
return nil, err
log := logger.FromContext(ctx).Sugar()
counter := 0
for rows.Next()
counter++
if counter%defaultFetchCount == 0
log.Infof("finished converting %v rows by %v", counter, time.Now())
result, err = row2mapStr(rows, fields)
if err != nil
return nil, err
resultsSlice = append(resultsSlice, result)
return &resultsSlice, nil
func row2mapStr(rows *sql.Rows, fields []string) (resultsMap map[string]string, err error)
result := make(map[string]string)
scanResultContainers := make([]interface, len(fields))
for i := 0; i < len(fields); i++
var scanResultContainer interface
scanResultContainers[i] = &scanResultContainer
if err := rows.Scan(scanResultContainers...); err != nil
return nil, err
for ii, key := range fields
rawValue := reflect.Indirect(reflect.ValueOf(scanResultContainers[ii]))
// if row is null then as empty string
if rawValue.Interface() == nil
result[key] = ""
continue
if data, err := value2String(&rawValue); err == nil
result[key] = data
else
return nil, err
return result, nil
func value2String(rawValue *reflect.Value) (str string, err error)
aa := reflect.TypeOf((*rawValue).Interface())
vv := reflect.ValueOf((*rawValue).Interface())
switch aa.Kind()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
str = strconv.FormatInt(vv.Int(), 10)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
str = strconv.FormatUint(vv.Uint(), 10)
case reflect.Float32, reflect.Float64:
str = strconv.FormatFloat(vv.Float(), 'f', -1, 64)
case reflect.String:
str = vv.String()
case reflect.Array, reflect.Slice:
switch aa.Elem().Kind()
case reflect.Uint8:
data := rawValue.Interface().([]byte)
str = string(data)
if str == "\x00"
str = "0"
default:
err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
// time type
case reflect.Struct:
if aa.ConvertibleTo(timeType)
str = vv.Convert(timeType).Interface().(time.Time).Format(time.RFC3339Nano)
else
err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
case reflect.Bool:
str = strconv.FormatBool(vv.Bool())
case reflect.Complex128, reflect.Complex64:
str = fmt.Sprintf("%v", vv.Complex())
default:
err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
return
【问题讨论】:
waitGroup
是什么东西?这是顺序行处理代码,有什么需要?也许这就是问题所在?
@BurakSerdar waitGroup 被编码,它强制执行顺序处理。他在 Go 之后立即等待。这意味着它只会等待一个 goroutine。对我来说,更大的问题是代码中内置的泛化量。识别列类型并定位这些类型的小查询将显着减少代码大小。
@praveent 我在代码中看不到任何可能导致所描述行为的内容,除了 waitGroup 的东西,也许还有一些服务器端问题。我的第一直觉是删除与等待组相关的代码。在 for 循环中从数据库中读取行不需要单独的 goroutine。
@BurakSerdar 我同意。 waitGroup 不会向代码添加任何内容。它的编写方式是一种迂回的方式来编写顺序代码。
这是服务器端的空间问题。我在扫描时更改了处理数据的逻辑。谢谢@MaximKosov
【参考方案1】:
由于服务器内存有限,阵列可能会卡住而无法继续进行。 我已经开始处理扫描的数据,这解决了我的问题。
【讨论】:
以上是关于database/sql rows.scan 在 350K 行后挂起的主要内容,如果未能解决你的问题,请参考以下文章
Go MySQL 中的错误:func (*Rows) Scan 返回恐慌:(func() string)
我必须在调用* sql.Tx.Rollback()之前调用* sql.Rows.Close()吗?