Neo4j 在谷歌云 pubsub 订阅中写入事务恐慌

Posted

技术标签:

【中文标题】Neo4j 在谷歌云 pubsub 订阅中写入事务恐慌【英文标题】:neo4j write transaction panic in google cloud pubsub subscription 【发布时间】:2020-06-29 07:35:00 【问题描述】:

我尝试使用 Go bolt 客户端将数据从 Google 云 pubsub 写入 neo4j,但 pubsub 接收器中的写入事务出现恐慌。在以下示例中,我删除了解组 pubsub 消息的代码,并在 neo4j 写入事务中硬编码了测试值。

package main

import (
    "context"
    "fmt"
    "github.com/neo4j/neo4j-go-driver/neo4j"
    "log"
    "cloud.google.com/go/pubsub"
)

var (
    driver   neo4j.Driver
    session  neo4j.Session
    result   neo4j.Result
)

func main() 

    ctx := context.Background()
    cctx, _ := context.WithCancel(ctx)

    client, err := pubsub.NewClient(cctx, "projectid")
    if err != nil 
        log.Fatal(err)
    

    var driver neo4j.Driver
    driver, err = neo4j.NewDriver("bolt://localhost:7687", neo4j.BasicAuth("neo4j", "neo4j", ""), func(c *neo4j.Config) 
        c.Encrypted = false
    )
    if err != nil 
        log.Fatal(err)
    
    defer driver.Close()

    session, err = driver.Session(neo4j.AccessModeWrite)
    if err != nil 
        log.Fatal(err)
    
    defer session.Close()


    node, err := session.WriteTransaction(func(transaction neo4j.Transaction) (interface, error) 
        result, err = transaction.Run(
            "CREATE (t:Test) SET t.prop = $prop RETURN id(t)",
            map[string]interface"prop": "Test Value 1")
        if err != nil 
            return nil, err
        

        if result.Next() 
            return result.Record().GetByIndex(0), nil
        

        return nil, result.Err()
    )
    if err != nil 
        log.Fatal(err)
    


    sub := client.Subscription("data")
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 

        node, err := session.WriteTransaction(func(transaction neo4j.Transaction) (interface, error) 
            result, err = transaction.Run(
                "CREATE (t:Test) SET t.prop = $prop RETURN id(t)",
                map[string]interface"prop": "Test Value 2")
            if err != nil 
                return nil, err
            

            if result.Next() 
                return result.Record().GetByIndex(0), nil
            

            return nil, result.Err()
        )
        if err != nil 
            log.Fatal(err)
        
        fmt.Println(node.(int64))

        msg.Ack()
    
    if err != nil 
        log.Println(err)
    



这是 pubsub 订阅中 neo4j 写入事务中的恐慌。第一个写事务运行良好,没有恐慌。


panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5eed97]

goroutine 125 [running]:
log.(*Logger).Output(0x0, 0x2, 0xc0005e4000, 0x66, 0x0, 0x0)
        /usr/local/go/src/log/log.go:153 +0x47
log.(*Logger).Printf(0x0, 0xa2677e, 0x20, 0xc0005e2070, 0x1, 0x1)
        /usr/local/go/src/log/log.go:179 +0x7e
github.com/neo4j/neo4j-go-driver/neo4j.(*internalLogger).Errorf(0xc0001230b0, 0xa2677e, 0x20, 0xc0005e2070, 0x1, 0x1)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/logging_impl.go:83 +0x61
github.com/neo4j/neo4j-go-driver/neo4j.(*statementRunner).id(0xc00037c380, 0x0, 0x0)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/runner.go:75 +0x106
github.com/neo4j/neo4j-go-driver/neo4j.(*neoSession).id(...)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:119
github.com/neo4j/neo4j-go-driver/neo4j.runTransaction.func1(0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:236 +0x118
github.com/neo4j/neo4j-go-driver/neo4j.(*retryLogic).retry(0xc0007aae28, 0xc0007aae58, 0x9, 0x9, 0x7f405cdaf630, 0xc0003da6c0)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/retry.go:66 +0x123
github.com/neo4j/neo4j-go-driver/neo4j.runTransaction(0xc0000b4cd0, 0x0, 0xc0005e2000, 0x0, 0x0, 0x0, 0x10, 0x97f600, 0x430a01, 0xc0005e2000)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:233 +0x11f
github.com/neo4j/neo4j-go-driver/neo4j.(*neoSession).WriteTransaction(0xc0000b4cd0, 0xc0005e2000, 0x0, 0x0, 0x0, 0xc0004de060, 0xc0002fe580, 0x0, 0xc0003da760)
        /home/username/go/src/github.com/neo4j/neo4j-go-driver/neo4j/session_impl.go:146 +0x63
main.main.func3(0xada300, 0xc00007d5c0, 0xc00038e750)
        /home/username/go/src/project/neo4jimporter/main.go:64 +0x9b
cloud.google.com/go/pubsub.(*Subscription).receive.func3(0xc00002b330, 0xc0001c3070, 0xada300, 0xc00007d5c0, 0xc00038e750)
        /home/username/go/src/cloud.google.com/go/pubsub/subscription.go:729 +0x6a
created by cloud.google.com/go/pubsub.(*Subscription).receive
        /home/username/go/src/cloud.google.com/go/pubsub/subscription.go:727 +0x1fb
exit status 2

我在 pubsub 订阅中做错了什么?

谢谢!

【问题讨论】:

【参考方案1】:

只需要在 pubsub 接收器中打开会话,因为会话不是线程安全的。

err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 

    session, err := driver.Session(neo4j.AccessModeWrite)
    if err != nil 
        log.Fatal(err)
    
    defer session.Close()

【讨论】:

以上是关于Neo4j 在谷歌云 pubsub 订阅中写入事务恐慌的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 terraform 使用服务帐户创建谷歌云 pubsub 订阅?

谷歌云 pubsub node.js 客户端与谷歌云功能不兼容

通过 mosquitto 代理发布到谷歌云中的不同 pubsub 主题?

如何以编程方式在谷歌云运行 api 中获取当前项目 ID

从 google pubsub 到 spark 流的数据摄取速度很慢

如何在谷歌云数据流中停止流式传输管道