golang Goroutine安全模式使用通道来抽象使用频道。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang Goroutine安全模式使用通道来抽象使用频道。相关的知识,希望对你有一定的参考价值。

/*
  A Goroutine safe pattern using channels that abstracts away the channels

  This is a concept of creating a goroutine safe object that uses channels under the covers to communicate
  with the internal map[string]string structure.  I know that typically this kind of solution may done
  with mutexes but the excercise was in using channels on purpose although they are heavier.

  Note a couple of points:

  - When using channels, you can still build a public-facing api that nicely abstracts them away, therefore
    somemone using this api for example, doesn't have to understand the paradigm of communicating over channels
  - This example is just a prototype, as an example
  - Notice that all state is mutated internal to the Db's f function
  - Notice that in the Fetch method there is bi-directional communication a send/receive
*/

package main

import "fmt"
import "time"

type KeyValue struct {
	Key   string
	Value string
	Reply chan KeyValue
}

type Db struct {
	db           map[string]string
	storeChannel chan KeyValue
	fetchChannel chan KeyValue
}

func NewDb() *Db {
	d := &Db{}

	d.db = make(map[string]string)

	d.storeChannel = make(chan KeyValue)
	d.fetchChannel = make(chan KeyValue)

	go func() {

		for {
			select {
			case storeValue := <-d.storeChannel:
				d.internalStore(storeValue)

			case fetchKey := <-d.fetchChannel:
				fetchKey.Reply <- d.internalFetch(fetchKey)
			}
		}
	}()

	return d
}

func (d *Db) internalFetch(kv KeyValue) KeyValue {
	v, ok := d.db[kv.Key]
	if ok {
		return KeyValue{Key: kv.Key, Value: v}
	}
	return KeyValue{Key: kv.Key}
}

func (d *Db) internalStore(kv KeyValue) {
	d.db[kv.Key] = kv.Value
	fmt.Println("Just stored: ", kv)
}

func (d *Db) Fetch(key string) KeyValue {

	ch := make(chan KeyValue)
	d.fetchChannel <- KeyValue{Key: key, Reply: ch}

	return <-ch
}

func (d *Db) Store(key string, value string) {

	d.storeChannel <- KeyValue{Key: key, Value: value}
}

func main() {

	myDb := NewDb()

	//myDb can safely be used by many goroutines although in this example it's only used by the main goroutine.
	myDb.Store("id-3383", "John")
	myDb.Store("id-2218", "Ralph")
	myDb.Store("id-7741", "Amy")

	//simulate some time has gone by
	time.Sleep(time.Second * 1)

	fmt.Println(myDb.Fetch("id-3383"))
	fmt.Println(myDb.Fetch("id-7741"))

	//not found returns a KeyValue with an empty value
	fmt.Println(myDb.Fetch("id-9965"))


	var s string
	fmt.Scanln(&s)
}

选择中的Golang频道未接收

【中文标题】选择中的Golang频道未接收【英文标题】:Golang channel in select not receiving 【发布时间】:2018-03-15 23:48:50 【问题描述】:

我目前正在编写一个使用通道、选择和 goroutine 的小脚本,但我真的不明白为什么它没有像我想的那样运行。

我有 2 个频道供我所有的 goroutine 收听。

我将通道传递给每个 goroutine,其中有一个 select,必须根据数据首先出现的位置在 2 个之间进行选择。

问题是没有goroutine属于第二种情况。我可以一个接一个地收到 100 个工作,我在日志中看到了所有内容。它在第一种情况下的要求很好,之后它在第二个频道中发送了工作(如果它做得很好......)我没有更多的日志了。 我只是不明白为什么......

如果有人可以启发我:)

package main

func main() 

    wg := new(sync.WaitGroup)
    in := make(chan *Job)
    out := make(chan *Job)
    results := make(chan *Job)

    for i := 0; i < 50; i++ 
        go work(wg, in, out, results)
    

    wg.Wait()

    // Finally we collect all the results of the work.
    for elem := range results 
            fmt.Println(elem)
        


func Work(wg *sync.WaitGroup, in chan *Job, out chan *Job, results chan *Job) 
    wg.Add(1)
    defer wg.Done()
    for 
        select 
        case job := <-in:
            ticker := time.Tick(10 * time.Second)

            select 
            case <-ticker:
                // DO stuff
            if condition is true 
                out <- job
            
            case <-time.After(5 * time.Minute):
                fmt.Println("Timeout")
            
        case job := <-out:
            ticker := time.Tick(1 * time.Minute)

            select 
            case <-ticker:
                // DO stuff
            if condition is true 
                results <- job
            

            case <-quitOut:
                fmt.Println("Job completed")
            
        
    

我创建了一些工人,他们听 2 个频道并将最终结果发送到第 3 个。

它对接收到的作业做一些事情,如果它验证给定的条件,它会将这个作业传递到下一个通道,如果它验证一个条件,它会将作业传递到结果通道。

所以,在我的脑海中,我有一个这样的管道,用于 5 个工作人员,例如:通道 IN 中的 3 个工作,直接 3 个工作人员接受它们,如果 3 个工作验证条件,它们将在通道 OUT 中发送。直接由 2 名工人接手,第 3 份工作由前 3 名工人中的一名接手……

现在我希望您对我的第一个代码有更好的理解。但在我的代码中,我从来没有遇到过第二种情况。

【问题讨论】:

Job 类型的定义在哪里?此外,您的工作定义不应该在 goroutine 中。您正在启动多个侦听器。 Job 是在另一个 go 文件中定义的,我没有把他的,因为他包含很多信息......你的意思是我的工作定义不应该在 goroutine 中?跨度> @JimB 我的拼写错误我有一个 channeliIn 的案例和一个 channel out 的案例 您的嵌套选择可能不像您认为的那样工作;内部选择只会执行一次,所以没有办法例如&lt;-ticker 案例发送到 quitIn 并在 &lt;-quitIn 案例中执行;不可能。只有一个案件将被执行。 quitOut 也一样。 【参考方案1】:

因为您的功能是“工作”,而您正在调用“工作”。

【讨论】:

【参考方案2】:

我认为您的解决方案可能有点过于复杂。这是一个简化版本。请记住,有许多实现。一篇好文章,值得一看

https://medium.com/smsjunk/handling-1-million-requests-per-minute-with-golang-f70ac505fcaa

Go 手册甚至更好

https://gobyexample.com/worker-pools(我想这可能是你的目标)

无论如何,下面是一个不同类型的示例。有几种方法可以解决这个问题。

package main

import (
    "context"
    "log"
    "os"
    "sync"
    "time"
)

type worker struct 
    wg   *sync.WaitGroup
    in   chan job
    quit context.Context


type job struct 
    message int


func main() 
    numberOfJobs := 50

    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    w := worker
        wg:   &sync.WaitGroup,
        in:   make(chan job),
        quit: ctx,
    

    for i := 0; i < numberOfJobs; i++ 
        go func(i int) 
            w.in <- jobmessage: i
        (i)
    

    counter := 0
    for 
        select 
        case j := <-w.in:
            counter++
            log.Printf("Received job %+v\n", j)
            // DO SOMETHING WITH THE RECEIVED JOB
            // WORKING ON IT
            x := j.message * j.message
            log.Printf("job processed, result %d", x)
        case <-w.quit.Done():
            log.Printf("Recieved quit, timeout reached.  Number of jobs queued: %d, Number of jobs complete: %d\n", numberOfJobs, counter)
            os.Exit(0)
        default:
            // TODO
        
    


【讨论】:

【参考方案3】:

您的quitInquitOut 频道基本上是无用的:您创建它们并尝试从它们接收。您不能这样做,因为没有人可以写信给这些频道,因为甚至没有人知道它们的存在。我不能说更多,因为我不明白代码应该做什么。

【讨论】:

这是一个完全相关的评论,但不是问题的答案。

以上是关于golang Goroutine安全模式使用通道来抽象使用频道。的主要内容,如果未能解决你的问题,请参考以下文章

Golang入门到项目实战 golang并发变成之通道channel

如果通过 Golang 通道发送,结构是不是实际上在 goroutine 之间复制?

Golang:为啥增加缓冲通道的大小会消除我的 goroutine 的输出?

[golang] channel通道

选择中的Golang频道未接收

golang-----golang sync.WaitGroup解决goroutine同步