database/sql rows.scan 在 350K 行后挂起

Posted

技术标签:

【中文标题】database/sql rows.scan 在 350K 行后挂起【英文标题】:database/sql rows.scan hangs after 350K rows 【发布时间】:2020-08-20 21:55:37 【问题描述】:

我的任务是从 Oracle 数据库中提取数据,我正在尝试提取大量数据 > 6MM 记录,其中包含 100 列进行处理。

需要将数据转换为地图。我成功地在不到 35 秒的时间内处理了 35 万条记录。之后,服务器挂起并且不再继续。 有没有办法可以根据行大小对它们进行批处理或批处理以释放我的空间。

func FetchUsingGenericResult(ctx context.Context, dsConnection *string, sqlStatement string) (*entity.GenericResultCollector, error) 
    columnTypes := make(map[string]string)
    var resultCollection entity.GenericResultCollector
    db, err := sql.Open("godror", *dsConnection)
    if err != nil 
        return &resultCollection, errors.Wrap(err, "error connecting to Oracle")
    
    log := logger.FromContext(ctx).Sugar()
    log.Infof("start querying from Oracle at :%v", time.Now())
    rows, err := db.Query(sqlStatement, godror.FetchRowCount(defaultFetchCount))
    if err != nil 
        return &resultCollection, errors.Wrap(err, "error querying")
    
    objects, err := rows2Strings(ctx, rows)
    log.Infof("total Rows converted are :%v by %v", len(*objects), time.Now())
    resultCollection = entity.GenericResultCollector
        Columns: columnTypes,
        Rows:    objects,
    
    return &resultCollection, nil


func rows2Strings(ctx context.Context, rows *sql.Rows) (*[]map[string]string, error) 
    result := make(map[string]string)
    resultsSlice := []map[string]string
    fields, err := rows.Columns()
    if err != nil 
        return nil, err
    
    log := logger.FromContext(ctx).Sugar()
    waitGroup, ctx := errgroup.WithContext(ctx)
    counter := 0
    for rows.Next() 
        counter++
        if counter%defaultFetchCount == 0 
            log.Infof("finished converting %v rows by %v", counter, time.Now())
        
        waitGroup.Go(func() error 
            result, err = row2mapStr(rows, fields)
            if err != nil 
                return err
            
            resultsSlice = append(resultsSlice, result)
            return nil
        )
        if err := waitGroup.Wait(); err != nil 
            return nil, err
        
    
    return &resultsSlice, nil


func row2mapStr(rows *sql.Rows, fields []string) (resultsMap map[string]string, err error) 
    result := make(map[string]string)
    scanResultContainers := make([]interface, len(fields))
    for i := 0; i < len(fields); i++ 
        var scanResultContainer interface
        scanResultContainers[i] = &scanResultContainer
    
    if err := rows.Scan(scanResultContainers...); err != nil 
        return nil, err
    
    for ii, key := range fields 
        rawValue := reflect.Indirect(reflect.ValueOf(scanResultContainers[ii]))
        // if row is null then as empty string
        if rawValue.Interface() == nil 
            result[key] = ""
            continue
        

        if data, err := value2String(&rawValue); err == nil 
            result[key] = data
         else 
            return nil, err
        
    
    return result, nil


func value2String(rawValue *reflect.Value) (str string, err error) 
    aa := reflect.TypeOf((*rawValue).Interface())
    vv := reflect.ValueOf((*rawValue).Interface())
    switch aa.Kind() 
    case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
        str = strconv.FormatInt(vv.Int(), 10)
    case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
        str = strconv.FormatUint(vv.Uint(), 10)
    case reflect.Float32, reflect.Float64:
        str = strconv.FormatFloat(vv.Float(), 'f', -1, 64)
    case reflect.String:
        str = vv.String()
    case reflect.Array, reflect.Slice:
        switch aa.Elem().Kind() 
        case reflect.Uint8:
            data := rawValue.Interface().([]byte)
            str = string(data)
            if str == "\x00" 
                str = "0"
            
        default:
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        
    // time type
    case reflect.Struct:
        if aa.ConvertibleTo(timeType) 
            str = vv.Convert(timeType).Interface().(time.Time).Format(time.RFC3339Nano)
         else 
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        
    case reflect.Bool:
        str = strconv.FormatBool(vv.Bool())
    case reflect.Complex128, reflect.Complex64:
        str = fmt.Sprintf("%v", vv.Complex())
    default:
        err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
    
    return


有没有人遇到过类似的问题?

修改代码如下:

func FetchUsingGenericResult(ctx context.Context, dsConnection *string, sqlStatement string) (*entity.GenericResultCollector, error) 
    columnTypes := make(map[string]string)
    var resultCollection entity.GenericResultCollector
    db, err := sql.Open("godror", *dsConnection)
    if err != nil 
        return &resultCollection, errors.Wrap(err, "error connecting to Oracle")
    
    log := logger.FromContext(ctx).Sugar()
    log.Infof("start querying from Oracle at :%v", time.Now())
    rows, err := db.Query(sqlStatement, godror.FetchRowCount(defaultFetchCount))
    if err != nil 
        return &resultCollection, errors.Wrap(err, "error querying")
    
    objects, err := rows2Strings(ctx, rows)
    log.Infof("total Rows converted are :%v by %v", len(*objects), time.Now())
    resultCollection = entity.GenericResultCollector
        Columns: columnTypes,
        Rows:    objects,
    
    return &resultCollection, nil


func rows2Strings(ctx context.Context, rows *sql.Rows) (*[]map[string]string, error) 
    result := make(map[string]string)
    resultsSlice := []map[string]string
    fields, err := rows.Columns()
    if err != nil 
        return nil, err
    
    log := logger.FromContext(ctx).Sugar()
    counter := 0
    for rows.Next() 
        counter++
        if counter%defaultFetchCount == 0 
            log.Infof("finished converting %v rows by %v", counter, time.Now())
        
        result, err = row2mapStr(rows, fields)
        if err != nil 
            return nil, err
        
        resultsSlice = append(resultsSlice, result)
    
    return &resultsSlice, nil


func row2mapStr(rows *sql.Rows, fields []string) (resultsMap map[string]string, err error) 
    result := make(map[string]string)
    scanResultContainers := make([]interface, len(fields))
    for i := 0; i < len(fields); i++ 
        var scanResultContainer interface
        scanResultContainers[i] = &scanResultContainer
    
    if err := rows.Scan(scanResultContainers...); err != nil 
        return nil, err
    
    for ii, key := range fields 
        rawValue := reflect.Indirect(reflect.ValueOf(scanResultContainers[ii]))
        // if row is null then as empty string
        if rawValue.Interface() == nil 
            result[key] = ""
            continue
        

        if data, err := value2String(&rawValue); err == nil 
            result[key] = data
         else 
            return nil, err
        
    
    return result, nil


func value2String(rawValue *reflect.Value) (str string, err error) 
    aa := reflect.TypeOf((*rawValue).Interface())
    vv := reflect.ValueOf((*rawValue).Interface())
    switch aa.Kind() 
    case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
        str = strconv.FormatInt(vv.Int(), 10)
    case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
        str = strconv.FormatUint(vv.Uint(), 10)
    case reflect.Float32, reflect.Float64:
        str = strconv.FormatFloat(vv.Float(), 'f', -1, 64)
    case reflect.String:
        str = vv.String()
    case reflect.Array, reflect.Slice:
        switch aa.Elem().Kind() 
        case reflect.Uint8:
            data := rawValue.Interface().([]byte)
            str = string(data)
            if str == "\x00" 
                str = "0"
            
        default:
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        
    // time type
    case reflect.Struct:
        if aa.ConvertibleTo(timeType) 
            str = vv.Convert(timeType).Interface().(time.Time).Format(time.RFC3339Nano)
         else 
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        
    case reflect.Bool:
        str = strconv.FormatBool(vv.Bool())
    case reflect.Complex128, reflect.Complex64:
        str = fmt.Sprintf("%v", vv.Complex())
    default:
        err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
    
    return

【问题讨论】:

waitGroup 是什么东西?这是顺序行处理代码,有什么需要?也许这就是问题所在? @BurakSerdar waitGroup 被编码,它强制执行顺序处理。他在 Go 之后立即等待。这意味着它只会等待一个 goroutine。对我来说,更大的问题是代码中内置的泛化量。识别列类型并定位这些类型的小查询将显着减少代码大小。 @praveent 我在代码中看不到任何可能导致所描述行为的内容,除了 waitGroup 的东西,也许还有一些服务器端问题。我的第一直觉是删除与等待组相关的代码。在 for 循环中从数据库中读取行不需要单独的 goroutine。 @BurakSerdar 我同意。 waitGroup 不会向代码添加任何内容。它的编写方式是一种迂回的方式来编写顺序代码。 这是服务器端的空间问题。我在扫描时更改了处理数据的逻辑。谢谢@MaximKosov 【参考方案1】:

由于服务器内存有限,阵列可能会卡住而无法继续进行。 我已经开始处理扫描的数据,这解决了我的问题。

【讨论】:

以上是关于database/sql rows.scan 在 350K 行后挂起的主要内容,如果未能解决你的问题,请参考以下文章

Go MySQL 中的错误:func (*Rows) Scan 返回恐慌:(func() string)

我必须在调用* sql.Tx.Rollback()之前调用* sql.Rows.Close()吗?

go postgresql array

golang 的数据库操作感觉不怎么好,哎,PHP 直接 mysql

Golang database/sql源码分析

我应该总是将 database.sql 导入到 phpmyadmin 以连接 java