V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
volvo007
V2EX  ›  Go 编程语言

都说 go 简单 小白学完 channel 马上就不会了

  •  
  •   volvo007 · 2022-05-05 15:17:44 +08:00 · 4534 次点击
    这是一个创建于 962 天前的主题,其中的信息可能已经有所发展或是发生改变。

    到 v 上请教下各位大佬. 刚学完 channel 的部分, 就想自己出题考考自己. 任务比较简单, 就是将文件夹下的不同类型的文件移动到对应的子文件夹下面. 例如文件夹 A 包含文件 foo.raw, bar.raw, foo.jpg, bar.jpg 执行后会将 .raw 文件都移动到 raw 文件夹下, 而 .jpg 都会移动到 jpg 文件夹下

    我写单线程的还算能搞定, 大体如下

    fs, err := ioutil.ReadDir(filepath)
    // err 处理略
    
    typeDict := make(map[string]int)
    
    for _, fileinfo := range fs{
      slices := strings.Split(fileinfo.Name(), ".")
      type := slice[len(slice) - 1]
      
      if _, ok := typeDict[type]; ok {
        moveFile()
      } else {
        createFolder()
        moveFile()
        typeDict[type] = 1
      }
    

    思路比较直白, 就是先获得所有文件信息, 然后遍历获得每个文件的后缀. 如果有同名文件夹, 就把文件挪到这个文件夹里; 如果没有文件夹, 就先创建文件夹然后再移动并打个标记告知已经创建该文件夹 当然会有些边界没有考虑, 比如 文件、文件夹 混合出现, 多级文件夹等情况, 这里先忽略掉

    现在想并行执行这个任务, 用 goroutine 和 channel 要怎么完成呢

    看完教程, 我的理解是:

    既然要用 channel, 那就会有 senderreceiver 一对收发的东西 于是我就创建了一个 getInfo 和 一个 dealInfo 函数如下

    func getInfo(f []fs.FileInfo, c chan string) {
    	for _, fs := range f {
    		if fs.IsDir() || strings.HasPrefix(fs.Name(), ".") {
    			continue
    		} else {
    			c <- fs.Name()
    		}
    	}
    }
    
    func dealInfo(path string, typeDict map[string]int, c chan string) {
    	// var name string
    	for name := range c {
    		// name = <-c
    		sp := strings.Split(name, ".")
    		suffix := sp[len(sp)-1]
    
    		if _, ok := typeDict[suffix]; ok {
    			MoveFile(name, path, suffix)
    		} else {
    			CreateFolder(path, suffix)
    			MoveFile(name, path, suffix)
    			typeDict[suffix] = 1
    
    			fmt.Println(name)
    		}
    	}
    }
    

    getInfo 不停从切片里面读数据并放到 c 里面, 然后 dealInfo 又不停从 c 里面拿到文件名并作相应的处理. 这看起来 一发一收, 好像没毛病啊, 但是我在 main 里面执行 go getInfo()go dealInfo() 的时候, 怎么文件夹里面什么事情也没发生呢?

    我查了一下, 这种情况是子函数还来不及运行主函数就退出导致的. main() 尾部加上一个 time.Sleep 就可以解决. 但是这肯定不是标准做法, 这种情况一般会怎么做呢, 请大佬指教

    18 条回复    2022-11-07 12:15:59 +08:00
    seers
        1
    seers  
       2022-05-05 15:32:11 +08:00 via Android   ❤️ 1
    sync 包里可以看看,比如加上 waitgroup ,还可以再加个 goroutine 进行监控
    flmwswd
        2
    flmwswd  
       2022-05-05 15:35:47 +08:00
    waitgroup 解君忧愁
    masterclock
        3
    masterclock  
       2022-05-05 15:36:43 +08:00   ❤️ 1
    1. sync.WaitGroup 用于等待 goroutine 完成
    2. 这里用 channel 不还是串行吗?要并行就应该在 getInfo 函数的循环里直接 go dealInfo() 类似的操作
    volvo007
        4
    volvo007  
    OP
       2022-05-05 15:56:15 +08:00
    @masterclock 谢谢大佬指点, 这确实是我另外的一个困惑点…… 因为上面看到了 waitgroup 就去搜了下, 然后发现 wg.Add(1) 是在一个循环里面, 而这个循环又包了一个 go func(), wg.Wait 则放在循环外面, 然后我就又懵逼了. 因为目前的写法主函数里面没有循环, wg.Add 加不起来……


    于是下一个点是, 如果只用 waitgroup 就可以让 goroutine 并发完成的话, 那 channel 一般要在什么场合使用啊
    flmwswd
        5
    flmwswd  
       2022-05-05 17:11:05 +08:00   ❤️ 1
    waitgroup 作用是让 main routine 等待其他两个 routine 运行完后在退出,和 channel 使用不冲突吧;

    你的 getInfo 和 dealInfo 其实更像单生产-单消费者模型(简易流水线?);这里如果要引入并行,并发安全是不是需要考虑一下,毕竟如果多个线程潜在会创建文件夹+移动文件的话,先后顺序很重要。
    volvo007
        6
    volvo007  
    OP
       2022-05-05 17:24:09 +08:00
    @flmwswd 还有这么多讲究, 学习了. 我用 3L 的方法引入了 waitgroup, 并且把 dealInfo 扔到 getInfo 内部的循环里面去, 程序可以跑了.

    不过当我对一个有 2w .txt 和 2w .abc 的文件夹进行操作的时候, 报错 runtime: failed to create new OS thread 了, 这个是因为创建太多携程的关系吗.

    我理想中的携程应该有一个池子或者队列往里塞任务, 然后不断从池子里面拿东西出来处理, 直到池子 /队列清空. 不知道代码后面发生了什么……
    volvo007
        7
    volvo007  
    OP
       2022-05-05 17:52:12 +08:00
    目前情况是, 用 waitgroup, 并通过循环 Add 文件处理操作任务, 在循环外 wait 所有任务结束

    这种写法可以跑, 但是文件数量太多的话就会 runtime: failed to create new OS thread 报错

    如果一个文件夹包含 2w 个 .txt 文件和 2w 个 .abc 文件, 非并发可以跑完, 耗时 1.5 - 2.3 s; waitgroup 方式报错, 感觉是创建了太多携程. 目前还不知道有没有办法创建指定的携程数量去处理这些任务

    另一方面, 如果降低文件数量, 比如只包含 2k 个 .txt 文件 和 2k 个 .abc 文件, 这个时候 waitgroup 也可以跑了. 非并发和并发分别耗时 250+ ms 和 160+ ms. 并发的是会快一些 (都是空文件, 也都是直接移动文件而非复制, 应该没什么 io 压力所以区别还没有那么明显)
    Frankcox
        8
    Frankcox  
       2022-05-05 18:02:21 +08:00   ❤️ 1
    我猜你想做的是这个?
    https://gist.github.com/CirillaQL/eb6068c3e73fe3880afe50ab7fa277e8
    注意并发时用 sync.map
    至于 runtime: failed to create new OS thread ,有两个方法。
    1.使用有缓存 channel ,并再加个 waitGroup 限制同时执行的 goroutine 数量。
    2.借助于 x 包中的 semaphore
    3.使用协程吃
    Frankcox
        9
    Frankcox  
       2022-05-05 18:45:44 +08:00
    @Frankcox 协程池。。
    volvo007
        10
    volvo007  
    OP
       2022-05-05 19:38:15 +08:00
    @Frankcox 谢谢大佬的回复, 我照着实现了一下

    关于 sync.Map 我搜了一下, 主要是为了解决 map 线程不安全的问题而设立了 sync.Map. 这个我在代码里用到了文件夹是否存在的判断上面. 如果没有相关文件夹, 我要建立一个新的文件夹并保存, 并在 map 里放入一个对应的名字. 如果不用 sync.Map 往 map 里面写东西的时候可能会乱掉

    我用大佬的写法确实跑成功了, 但是文件数量一多 (比如 2w), 还是会 runtime: failed to create new OS thread 错误 😂
    seers
        11
    seers  
       2022-05-05 19:39:27 +08:00
    你把思路打开嘛,你需要的是一个可控的阻塞,time.Sleep 为什么可行因为是个阻塞,但是不可控,包括 waitgroup 也是个阻塞,但是可控,你完全可以给 dealInfo 添加返回值例如最后处理完了给个信号,main 函数再来个接受信号的地方,方法有很多,按自己顺手的来
    volvo007
        12
    volvo007  
    OP
       2022-05-05 20:24:23 +08:00
    @seers 大佬说的对…… 我看教程里面就是给了个返回信号 (同时也说了 main 其实也是个协程)…… 但我有点懵不知道咋给
    目前的想法是, 多给一个 done chan bool 类型, 事情做完了就 bool <- true, 但是这玩意怎么给到外面我再想想……
    dcoder
        13
    dcoder  
       2022-05-05 23:13:02 +08:00
    小声说: 其实 channel 没有吹的这么好用, go 吹嘘的 csp 模型在 go 里也不一定好用...
    最后, 你发现还是得用一堆 lock/mutex, 去折腾 sync.Mutex 这些...
    Goat121
        14
    Goat121  
       2022-05-05 23:37:23 +08:00
    @dcoder 其实和语言关系不大,就是省得你再手写一个队列而已,用其他语言该用队列的地方用 channel ,其他语言该用锁的地方还是用锁,看场景吧。哪有包打天下的数据结构或者模型
    dcoder
        15
    dcoder  
       2022-05-06 01:57:30 +08:00
    @Goat121 "就是省得你再手写一个队列而已" 你说的对
    只是 Go 把 channel 做成了语言特性, 然后宣传还多, 导致了大家觉得 channel 不只是 "就是省得你再手写一个队列而已"
    flmwswd
        16
    flmwswd  
       2022-05-06 20:00:47 +08:00
    @volvo007 你第三段话理解是对的(即协程塞任务和拿出来处理这个),但是你第二个那个报错还有其他信息吗,我觉得准确的说不是太多协程了,而是底层申请了太多系统线程了;
    volvo007
        17
    volvo007  
    OP
       2022-05-10 19:49:02 +08:00
    @flmwswd 我猜反正就是有一个爆栈了…… 没有多的报错, 但是如果我的 main 里面 sleep 一下的话, 这些东西就能跑完. 上面大佬提到说给一个可控的阻塞, 就能实现和 sleep 不可控阻塞一样的效果.

    原理上来说现在是搞明白了, 就是 main 里面的语句遇到 goroutine 就只管开协程然后继续往下跑. 如果 main 里面没有阻塞, 那 main 就会先跑完, 然后那些 goroutine 都跳过去了

    以我的例子为例, 如果将 塞数据、用数据作为两个函数分别执行的话, main 函数里应该是这样的吧:

    1. 第一个 for 之前
    2. 第一个 for, 生成第一个函数的一个 goroutine
    2a. 第一个 goroutine 开始跑
    3. 不管第一个 goroutine 跑的怎么样了, 我进入第二个 for, 开启第二个函数的 goroutine
    3a. 第二个 goroutine 开始跑
    4. 继续往下看 main 有无阻塞, 没阻塞就啥都跑不出来. 有阻塞就等阻塞结束, 然后等待的这个期间, 其他 goroutine “趁机” 跑完

    所以原来的写法, 就是每一次 for 循环都会加入一个 goroutine 并行跑. 这个迭代对象的内容少一些就可以正常跑完, 一旦多了, 新建的比消耗的速度快, 迟早会爆掉

    所以原来的写法处理少量数据还行, 大量的就不行了, 但思路上也不应该每一次 for 循环都创建 goroutine, 这是明显的错误写法. 另一方面, 我创建了 1000 个写的 goroutine 和 1000 个读的 goroutine 又能怎么样呢, channel 不是只有一份嘛 😂…… 等于说只是大家抢着往这个 channel 队列里面塞东西和拿东西罢了, 但是这个 channel 的进出口只有一个啊……

    所以思路上应该是一开始就建立比如 20 、50 个 goroutine 用来写到对应数量的 channel 里, 然后若干其他的 goroutine 用来读和处理, 这样才能“并行”起来对吧. (并且一旦某个 channel 没有数据之后还要关掉)

    由于这个例子里面, 很多人反应 新建文件夹 和 移动文件容易造成问题, (而且没准我真的遇到了问题只是自己不能判断是否是线程安全问题造成的), 所以后面我准备换一个缩略图的例子试一下. 假设一个文件夹内有 6w 张图片, 我准备试一下看看如何用并行的方式去处理这些图片
    volvo007
        18
    volvo007  
    OP
       2022-11-07 12:15:59 +08:00
    这篇主题发表于 180 天前。我也去爆栈问过,但是回答都和我这样操作文件不安全相关,并没有正面解决这个问题。

    期间我去和单片机嗑了一段时间,当然还有各种本职工作,关于 goroutine 、channel 、sync 库 之类的东西没有顾得上。前几周工作稍微松了一点,这个话题又在心里浮现出来,总觉得是个坎,不彻底搞清楚的话自己这关过不去,也别谈掌握 golang 了。另外,我还注意到这个话题不断有零星的用户在关注,又让我感觉自己好像是个逃兵,问题还没解决我就跑了……

    言之种种,在我做了一些针对 channel 的练习之后,算是大概搞清楚了这个例子要怎么写。可能不是 best practice ,但希望能帮到大家,特别是一直关注这个帖子的朋友。

    ========
    在开始之前,我们稍微回顾一下之前的逻辑。相关函数见主题帖子的最后一部分

    抽象一下这个主题的使用场景:
    函数 1 是一个简单、耗时很短的功能;函数 2 则是一个复杂、耗时长的功能。
    我们期待通过 goroutine 来并发处理 函数 2 以达到提升处理性能的目的。

    我用的文件处理案例,根据大佬们所说会有线程安全问题,我们先忽略这个问题,主要还是把握后面的方法论哈

    1. 函数 getInfo (f []fs.FileInfo, c chan<- string) 通过遍历 []fs.FileInfo 结构,进行了一些简单判断后(例如忽略文件夹、.DS_store 这种),不断将文件名写入 c 这个 string channel 中

    2. 函数 dealInfo (path string, typeDict map[string]int, c <-chan string) 通过 range 方法,不断获取 c 之前保存的文件名,截取后缀之后,要么转入对应文件夹,要么创建新文件夹再转入

    ========
    到这里其实思路上是没有什么问题的,这里最关键的是没有注意到简单练习里不会提到的一个知识点:**用 range 遍历 channel 的时候,需要主动 close channel. 否则 range 会阻塞 channel 直到 deadlock panic**. 尽管所有 channel 会在 main channel 结束的时候被强制结束.

    如果不用 range 的方式来遍历的话,我们需要写一个 if _, ok := <- c; ok { ... } 这样的东西放倒一个死循环里面,也就是每次循环都要来手动判断一次 c 里面还有没有东西,没东西了我就跳出循环呗。显然 range 遍历的方式更优雅,但要考虑 channel close 的时机。

    第二个点则是如何 “并发” 处理 函数 2 。如果只用 go func(),最多只能实现两个 goroutine 之间的通信,所以我们引入了线程池 sync 库来解决这个问题——我们需要给每个 goroutine 加入到线程池里面,但在某个线程工作结束的时候又要把它从池子里面拿掉。最后,还需要一个 wait 函数来通知主线程等待这些线程工作结束。

    具体来说,我们需要改写一下前面的函数 1 、函数 2 了:

    对于函数 1 ,原始伪代码:
    func getInfo(f []fs.FileInfo, c chan<- string){
    遍历 f { 处理后的 fineName 写入 c }
    }

    现在应当改写为:
    func getInfo(f []fs.FileInfo, c chan<- string){
    // 后面要用 sync.Add 加入池子,所以这里要减去。加入和减去要匹配, 重要!
    defer sync.Done()

    遍历 f { 处理后的 fineName 写入 c }

    // 后面其他函数会用 range 来遍历,所以一定要 close ,重要!
    close(c)
    }

    对于函数 2 ,由于会用多个 goroutine 并发,那么每一次都需要一个 sync.Add(1) 来加入,所以每一次我们还要从 函数 2 里减去这个线程

    原函数 2 伪代码:
    func dealInfo(path string, typeDict map[string]int, c <-chan string){
    for _, filename := range c {
    判断文件;
    处理文件;
    }
    }

    现在改写为:
    func dealInfo(path string, typeDict map[string]int, c <-chan string){
    defer sync.Done()

    for _, filename := range c {
    判断文件;
    处理文件;
    }
    }

    非常简单,就是在循环前加一个 defer sync.Done() 就可以了。

    最后,我们来写主函数的伪代码:

    <====

    import ("sync", ... )

    var wg sync.WaitGroup // 为了创建多线程并发,准备线程池

    func getInfo( ... ) // 实现 func1

    func dealInfo( ... ) // 实现 func2

    func main(){

    c := make(chan string, 1000)

    wg.Add(1)
    go 函数 1

    for i:=0; i<16; i++ {
    wg.Add(1)
    go 函数 2
    }

    wg.Wait()
    }

    ====>

    这里应该就能充分暴露前面改写过程中加入的奇怪东西的目的了 😄

    可以发现,wg.Add(1) 之后,一定会紧跟一个带有 defer wg.Done() 的函数,来实现线程加减的匹配

    而对于比较复杂的函数 2 ,我们通过一个循环来加入 N 个 goroutine 线程。wg.Add(1) 放在循环里面,同时每个 wg.Add() 都必然对应一个 defer wg.Done() 来匹配

    最后,别忘了放一个 wg.Wait() 来通知主线程等待所有 wg 的线程执行完毕——它靠的就是不断 Add ,之后又不断 Done ,直到池子里线程归零的那一瞬来判断任务全部结束的。所以 Add 和 Done 必须匹配

    另外一个之前没有提到的小改动是,我们建立 c (chan string) 的时候,还给了它一些缓存。这样,由于 func1 处理得很快,就可以预存一些结果到 c 里面,在面对 16 个 go func2 的时候,就能保证每个 func2 总是能拿到东西来处理,就不会空闲等待了。这个 N ,我在哪看到资料说是最大 10000 个,好像可以通过配置修改。不过对于大部分的场景,如果要修改这个参数,不如优化代码才是正道

    还有一个地方是,我们在循环加入 函数 2 goroutine 的时候,wg.Add(1) 放在了循环里面。由于我知道这里的循环会创建 16 个 goroutine ,所以我们也可以一开始就在循环外面 wg.Add(16) 把它一口气全加进去。由于每个循环有一个 defer wg.Done() ,所以最后线程池还是可以归零的。只是这样写如果后期要扩充数量的话会有点不好维护,还是每个循环 +1 ,N 则通过配置文件来提供更妥当。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2589 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 10:59 · PVG 18:59 · LAX 02:59 · JFK 05:59
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.