V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
haicoderibai
V2EX  ›  推广

谈谈 Golang 缓冲与无缓冲 channel

  •  
  •   haicoderibai · 2020-11-24 13:16:21 +08:00 · 847 次点击
    这是一个创建于 1257 天前的主题,其中的信息可能已经有所发展或是发生改变。

    原文来自:嗨客网

    Golang 缓冲与无缓冲 channel

    Golang 缓冲与无缓冲 channel 教程

    golang 在语言层面支持并发编程,也就是 goroutine,可以看做一种轻量级的线程。程序启动时,其主函数即在一个单独的 goruntine 中运行,叫做 main goruntine,在程序中通过关键字 go 跟上函数(支持匿名函数)就可以启动一个新的 goroutine,可以叫做 sub goruntine 。

    在基于多线程设计的并发编程模型中,线程间的通信往往通过共享数据来实现,而保证共享数据的一致性非常关键。如果线程间有竞争条件,那么对共享数据的访问往往需要加锁来保证一致性,而针对不同的访问竞争,比如读 /读、读 /写、写 /写,需要用不同的锁机制,要想兼顾性能和一致性保证需要煞费苦心,尤其是线程间共享数据比较多的时候。

    为了更简单的并发编程,go 语言提出了自己的信仰:用通信来共享内存,而不要用共享内存来通信。对于 goroutine 之间的通信,channel 是最好的选择,铭记这句原则:用通信来共享内存,而不要用共享内存来通信,可以帮助我们更好的理解 channel 。

    channel 状态

    channel 作为 go 的一种基本数据类型,它有 3 种基本状态:nil 、open 、closed:

    /* nil channel */
    var ch = chan string // A channel is in a nil state when it is declared to its zero value
    ch = nil // A channel can be placed in a nil state
     
    /* open channel */
    ch := make(chan string) // A channel is in a open state when it’s made using the built-in function make.
     
    /* closed channel */
    close(ch) // A channel is in a closed state when it’s closed using the built-in function close.
    

    当 channel 处于这 3 种不同的状态时,对于 channel 上的操作也会有不同的行为,理解这些行为对于正确的使用 channel 非常重要。

    www.haicoder.net

    上面这张图总结了这些行为,需要注意的是处于 closed 状态的 channel,执行 send 操作( ch <- data )将会触发 panic 异常,而 receive 操作(<- ch )则不会,这表明了在 channel 被 close 之后,goruntine 仍然可以从 channel 取走数据,如果 channel 中没有数据可取时,receive 操作会立刻返回零值( nil )。

    range 循环可以直接在 channel 上迭代,当 channel 被关闭并且没有数据时可以直接跳出循环。另外,对于 nil 和 closed 状态的 channel 执行 close 操作也会触发 panic 异常。

    unbufferd channel 和 bufferd channel

    使用场景

    虽然 channel 最常用于 goroutine 之间的通信,但是 channel 上的 send 和 receive 操作并不一定需要携带数据。根据 channel 是否需要传递数据,可以区分出一些 channel 的使用场景。

    没有数据的 channel 的使用场景:

    • goroutine A 通过 channel 告诉 goroutine B:”请停止正在做的事情“
    • goroutine A 通过 channel 告诉 goroutine B:”我完成了要做的事情,但是没有任何结果需要反馈“

    通知的方式一般是 close 操作,goroutine A 对 channel 执行了 close 操作,而 goruntine B 得到 channel 已经被关闭这个信息后可以执行相应的处理。使用没有数据的 channel 的好处:一个 goroutine 可以同时给多个 goroutine 发送消息,只是这个消息不携带额外的数据,所以常被用于批量 goruntine 的退出。

    对于这种不携带数据,只是作为信号的 channel,一般使用如下:

    ch := make(chan struct{})
    ch <- struct{}{}
    <- ch
    

    带有数据的 channel 的使用场景:

    • goroutine A 通过 channel 告诉 goroutine B:”请根据我传递给你的数据开始做一件事情“
    • goroutine A 通过 channel 告诉 goroutine B:”我完成了要做的事情,请接收我传递的数据(结果)“

    通知的方式就是 goroutine A 执行 send 发送数据,而 goroutine B 执行 receive 接收数据。channel 携带的数据只能被一个 goruntine 得到,一个 goruntine 取走数据后这份数据在 channel 里就不复存在了。

    对于需要携带数据的 channel,一般又可以分成带有 buffer 的 channel ( bufferd channel )和不带 buffer 的 channel ( unbufferd channel )。

    unbufferd channel

    对于 unbufferd channel,不存储任何数据,只负责数据的流通,并且数据的接收一定发生在数据发送完成之前。更详细的解释是,goroutine A 在往 channel 发送数据完成之前,一定有 goroutine B 在等着从这个 channel 接收数据,否则发送就会导致发送的 goruntine 被 block 住,所以发送和接收的 goruntine 是耦合的。

    看下面这个例子,往 ch 发送数据时就使 main gouruntine 被永久 block 住,导致程序死锁。

    func main() {
        var ch = make(chan string)
        ch <- "hello" //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]:
        fmt.Println(<-ch)
    }
    

    有人可能会考虑将接收操作放到前面,不幸的是仍然导致了死锁,因为 channel 里没有数据,当前 goruntine 也会被 block 住,导致程序死锁。

    func main() {
        var ch = make(chan string)
        fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]:
        ch <- "hello"
    }
    

    这次,我们在另一个 goruntine 中执行 receive,程序就可以正常工作了。因为在 main goruntine 发送完数据之前,sub goroutine 已经在等待接收数据。

    func main() {
        var ch = make(chan string)
        go func() {
            fmt.Println(<-ch) //out: hello
        }()
        ch <- "hello"
    }
    

    再看下面这个例子,我们期望在 sub goruntine 中打印 10 个数,实际上却只有 main goruntine 打印了 hello 。因为在 sub goruntine 打印之前,main goruntine 就已经执行完成并退出了。

    func main() {
        go func() {
            for i := 0; i < 10; i++ {
                fmt.Printf("%d ", i)
            }
        }()
        fmt.Println("hello")
    }
    

    这个时候就可以用一个 unbufferd channel 来让两个 goruntine 之间有一些通信,让 main goruntine 收到 sub goruntine 通知后再退出。在这种场景中,channel 并不携带任何数据,只是起到一个信号的作用。

    func main() {
        var ch = make(chan string)
        go func() {
            for i := 0; i < 10; i++ {
                fmt.Printf("%d ", i)
            }
            ch <- "exit"
        }()
        fmt.Println("hello")
        <-ch
    }
    

    bufferd channel

    对带有缓冲区的 channel 执行 send 和 receive 操作,其行为和不带缓冲区的 channel 不太一样。继续讨论最开始的例子,不过这次的 channel 是一个 size=1 的 bufferd channel,将数据发送给 channel 后,数据被拷贝到 channel 的缓冲区,goruntine 继续往后执行,所以程序可以正常工作。

    func main() {
        var ch = make(chan string, 1)
        ch <- "hello"
        fmt.Println(<-ch) //hello
    }
    

    但是当我们调换发送和接收的顺序时,程序又进入了死锁。因为当 channel 里没有数据时(干涸),执行 receive 的 goroutine 也会被 block 住,最终导致了死锁。

    func main() {
        var ch = make(chan string, 1)
        fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]:
        ch <- "hello"
    }
    

    此外,buffer size=1 和 buffer size>1 的 channel 对于数据的交付也有一些细微的不同:

    • 对于 buffer size=1 的 channel,第二个数据发送完成之前,之前发送的第一个数据一定被取走了,否则发送也会被 block 住,这其实说明了数据的交付得到了延迟保证。
    • 对于 buffer size>1 的 channel,发送数据时,之前发送的数据不能保证一定被取走了,并且 buffer size 越大,数据的交付得到的保证越少。也正是由于这种无保证交付,减少了 goroutine 之间通信时的阻塞延迟,根据发送数据、接收数据、数据处理的速度合理的设计 buffer size,甚至可以在不浪费空间的情况下做到没有任何延迟。

    如果 channel buffer 已经塞满了数据,继续执行发送会导致当前 goruntine 被 block 住(阻塞),直到 channel 中的数据被取走一部分才可以继续向 channel 发送数据。

    通过 channel buffer,解耦了发送和接收的 goruntine 。需要小心的是,buffered channel 虽然可以看做一个缓存消息的队列,但是其主要用途还是用于多个 goruntine 之间的通信,单个 goruntine 中不要使用 buffered channel 来做缓存队列,send 和 receive 操作很容让 goruntine 被永久 block 住导致整个程序死锁,上面的 demo 也说明了这件事情。

    再看下面这个例子,一个简单的生产消费者模型,manager 每 200ms 有一个新的 work 需要分发给 3 个 worker 来完成,manager 每次都只是将 work 发送到一个 channel 中,work 自动从 channel 中取出 work 并处理,每个 worker 完成一个 work 需要 1s 的时间,manager 累计分发 10 个 work,这个时候我们发现没有阻塞。但是如果 manager 继续不停地分发 work,就会发现 channel 缓冲区被塞满,manager 总是在等待 worker 。所以,根据处理需求,合理的设计 worker ( goruntine )数量和 channel buffer size 非常重要。

    package main
    
    import (
        "fmt"
        "math/rand"
        "strconv"
        "time"
    )
    
    func main() {
        fmt.Println("嗨客网(www.haicoder.net)")
    
        const cap = 3
        ch := make(chan string, cap)
    
        for index := 0; index < cap; index++ {
            go func() {
                for p := range ch {
                    fmt.Println("Worker received: ", p)
                    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
                }
            }()
        }
    
        worknum := 10
        for index := 0; index < worknum; index++ {
            time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
            work := "work " + strconv.Itoa(index)
            select {
            case ch <- work:
                fmt.Println("Manager: send a work")
            default:
                fmt.Println("Manager: wait worker")
            }
        }
    
        close(ch)
    }
    

    运行后,如下图所示:

    www.haicoder.net

    我们使用了 channel,实现了协程的通信。

    原文地址:嗨客网

    更多文章,可以关注下方公众号:

    嗨客网

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2337 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 06:33 · PVG 14:33 · LAX 23:33 · JFK 02:33
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.