假设有三个协程池(A,B,C), 三个分别处理不一样的业务且每个协程池中的 worker 数量不一致
当前数据流向为 A => B => C,任务在任一协程池中都有出现 err 导致该任务跳过的可能
在如下两种情况下:
最后在主程序中,针对上述两种情况,有没有优雅的办法知道任务已经全部完成且让主程序退出 ?
1
kwanzaa 2021-08-07 17:59:13 +08:00
怎么听着像是 WaitGroup 该干的事儿
|
2
tim0991 OP 如果用 sync.WaitGroup 只适合情况 1,且需要在 err 出现的时候考虑到如何 done 好像挺恶心的
|
3
Trim21 2021-08-07 18:09:28 +08:00
|
4
Sasasu 2021-08-07 18:10:40 +08:00
激进:在每个任务的事件循环,每一轮循环都检测一下某个标识退出的全局变量是不是该退出了。如果没有循环那就每个阻塞操作都设超时,超时或完成时检测这个标识。
保守:关掉 A 的入口,然后当协程池工作线程 = 0 时计数 +1,计数等于协程池数量时退。 如果你要 Go 风格的话,起手一个全局 hash 表(带锁),里面 K 是每个 context 的指针。 激进:锁表,每个 context 都发 cancel,然后退出。 保守:关掉 A 的入口,轮训这个 hash 表,内容物为 0 时退出。 另外两个奇葩方案。 不想思考:关掉 A 的入口,sleep 一个所有任务超时的事件,退出。 做 CDN 的:用 dlopen 打开备用的 .so ,切工作函数。按照激进策略等旧的 .so 不再运行,释放旧 .so 然后移动新的 .so 。 |
6
tim0991 OP |
8
CEBBCAT 2021-08-08 01:25:47 +08:00
看不太懂,为什么不让 MQ 做消息转接呢?
|
9
Mitt 2021-08-08 01:45:23 +08:00 1
Context + WaitGroup 不是可以实现么,向下传递 Context 并附加 WaitGroup,每级往 WaitGroup 里添加真实的需要采集的数量并在出错的时候 defer wg.Done 并添加到 Context 中的 ErrArray,或者 errgroup,外层 wait 就好了,如果想让程序提前终止,就可以用带 cancel 的 context 从最外层一关,里面的所有任务就都会收到通知了
|
10
tim0991 OP |
11
lesismal 2021-08-08 10:47:24 +08:00
难道不是
wg.Add(1) pool.Go(func(){ defer wg.Done() .... }) 吗? |
13
lesismal 2021-08-08 12:11:30 +08:00
@tim0991 你只要是整体的别落下 add done,每个 Go 是一个,不管 Go 的 func 里有没有失败跳过,只要 defer done 了就能确保 wg.wait 正常结束,所以,好像不应该有这个困扰
|
14
securityCoding 2021-08-08 13:02:58 +08:00 via Android
应该是想要 completefuture 那种?
|
15
tim0991 OP @lesismal 但是这样其实也有一个问题,就是你上面的代码示例中,协程数量是不固定的,那要是固定的协程数量怎么写比较优雅一点?
|
16
Mitt 2021-08-08 14:46:19 +08:00
@tim0991 #10 context 是上下文啊,用来把 waitgroup 传递下去的,让不管多下层都能用同一个 waitgroup 并且能通过 context 感知上层要主动关闭 比如超时,而上层也可以通过 waitgroup 来感知下层任务是否已经完成,不管你下层开多少个协程,你每一层都是知道数量的,添加进去就好了
|
17
lesismal 2021-08-08 16:42:49 +08:00
@tim0991 协程池本身就应该自带控制协程数量的属性,否则协程池还不如直接 go 。你看我上面写的也是 pool.Go
|
18
lesismal 2021-08-08 16:52:46 +08:00
@lesismal @tim0991 #17 只要你生产速度大于协程池消费速度,一样能充分利用这些数量的协程并发。最简单的实现,一个带容量的 chan,生产者往 chan 里写,多个协程去读,当前协程都忙、就被 chan 缓冲了、发送数量大于协程数量和 chan size 生产者就阻塞,这些细节看你怎么设计,姿势太多了,我这个库里就有好几种定制的,以前有些特殊的 hash 和时序需要所以没用其他三方的:
https://github.com/lesismal/nbio/tree/master/taskpool 其他第三方的也很多 |
19
nuk 2021-08-08 17:14:54 +08:00
给协程加 id,有任务就塞 map,ABC 三个 map 都空了不就表示任务干完了。。
|
22
zhengxiaowai 2021-08-09 10:06:25 +08:00
先说结论:要在 main 中感知任务的完成状态可以通过 chan,控制数据流转也是用 chan,如果有超时机制或者每个任务属性之类数据需要用到 context 。
1 )步骤大约是 main 函数中创建三个用于接受结果的 A B C chan,大小为 A 任务个数,和一个 notify chan 用于完成通知,再来一个通知主程序关闭的 doneChan 起一个 goruntine 去 for notify chan go func() { for a := range nChan { if (ACount == BCount == CCount == 100) { doneChan <- struct{} } } } 起三个 goruntine 去 for 分别处理 A B C chan,里面写处理逻辑,大约是这样子 go func(nChan chan Notify) { for a := range AChan { // dosomething if ok { bChan <- weiboEntryID } else { nChan <- weiboEntryID } } close(bChan) }(nChan) go func(chan Notify) { for b := range BChan { // dosomething if ok { cChan <- commentID } else { nChan <- commentID } } close(cChan) }(nChan) go func(chan Notify) { for c := range CChan { // dosomething if ok { cChan <- commentUserID } else { nChan <- commentUserID } } }(nChan) 这时候就 main 中可以往 AChan 里写数据了,写完直接 close AChan,然后直接用 doneChan 阻塞 for userID := range userIDs { AChan <- userID } close(aChan) <-doneChan 大致流程就是这样,需要注意的是需要正确关闭 ABC chan,就是在发送完成后关闭,nChan 用于任务信息回收还可以用于任务回放,另外为了保证每个任务都会返回,需要弄一个 timeout context 超时当作失败处理 2 )第二种情况,由于任务个数是未知的,上面的 100 就不能用了,有两种方式可以解决,一是可以预先知道每次数量,用另外的一个 chan 再任务开始时候传给发送 doneChan 那个 goruntine,把 100 改成最后总数即可。二是未知任务数量,这时候只能再来一个 chan,在做完 ABC 后发送一个 aDone 、bDone 、cDone,给发送 doneChan 那个 goruntine,当确认三个都完成后,就可以发送 done 了。 ------ code 硬敲的可能很多不对,变量和具体结构可酌情改变 :-) |
23
tim0991 OP @zhengxiaowai 谢谢回复 我还是觉得上面说的由协程池中的任务数量来判断是否已经完成简单一点
|