比如服务端有 5000 人在线,每秒消息数 100 条,如果不优化,每秒的广播将是 50 万, 现在我想把 1s 之内的消息合并成 1 条,这样就只有 5000 次广播了,聊天这种 1s 延迟可以忍受。
现在的问题是 这种消息怎么保存?方便消费
语言 golang
1
takeoffyoung 2020-04-08 21:22:50 +08:00
方便消费的话,消息都扔分布式队列,然后 consumer 控制消费速率,做 batch 等等。
至于保存,可以写队列之前持久化消息或者在队列上做 dump 均可,看具体需求了。 |
2
dbskcnc 2020-04-08 22:06:18 +08:00
消息队列是比较方便的解决方案,pulsar 做这个好像挺合适的
|
3
matrix67 2020-04-08 22:48:33 +08:00
类似 telegram ? 他们有万人群。 或者限制群大小?和微信一样?
|
6
laravel OP @takeoffyoung 我在 golang 里加了个 queue 类型是 map[int64][]byte 然后启动另一个 goroutine 去合并相同时间戳的消息,可以实现,但是推送一多就挂了,我只是个业余写 go 的,还找不到为什么报错,好难调试啊
|
7
gy123 2020-04-08 23:22:16 +08:00 via iPhone
思路是,每次来消息启动一个延时一秒执行异步任务,并且执行的任务和任务信息绑定到一个全局变量,后续来的消息相当于合并到这个变量,没有新建任务。因为你说需求没有什么需要区分不同种类,那么全局变量相当于一直绑定一套异步任务,这个变量可以是个 map 或固定对象
|
9
ljpCN 2020-04-08 23:32:46 +08:00 via Android
生产者挨个扔进队列,消费者每次消费 n 个
|
12
gy123 2020-04-09 00:06:53 +08:00 via iPhone
打个比方,当某一时刻第一次连续发送了 1 秒的信息到队列,当轮训时间到达消费取消息的时间是在发消息的 0.5 秒,那么消费这些消息需要处理两次
|
13
NUT 2020-04-09 09:04:03 +08:00
我认为这不是一个服务端的优化,而是客户端的优化。
服务端提供策略,客户端自动切换策略。 对于客户端网络好的发送效率 ok 的可以使用 批量拉的方式, 对于客户端网络差的情况,得需要提供基于 http 的批量拉的情况。 合并包的核心思维就是 减少 单包的应答,以便减少在链路上的损耗。 可以采用批量确认的方案。 比如 s-》 123 c--》确认连续收到最大 3 。 不过这个要改你们消息存储。 消息系统核心一个就是路由、一个是消息存储 最后一个是 策略。 消息系统优化点很多。 不能独立的之咱在服务端的角度考虑问题而是要根据整个链路进行优化。 希望对你有用。 |
14
dbskcnc 2020-04-09 10:12:28 +08:00
@laravel 看你的处理似乎没有不丢失的需求,也不考虑内存,不用 Pulsar 也没什么问题
fuction 是个 filter,个人觉得用来做消息合并似乎不太合适,毕竟总是要把消息从 broker 读出来的,比较好控制的是什么时候去读和一次读取多少 |
15
hst001 2020-04-09 13:49:56 +08:00
首先明确的一下问题,楼主想问的应该是消息存储和读取时产生的广播,不是服务端广播给客户端的问题。
消息合并的思路走歪了。 消息合并在某些情形下可能得不尝试,比如大量的聊天窗口,聊天消息间隔大于 1 秒,这个时候等待合并就是浪费 CPU 内存,因为这些消息不适合合并,老老实实往消息队列推就好。 读扩散和写扩散其实是 IM 系统里面一定会遇到的存储问题,建议楼主通过搜索引擎深入了解一下。两种方法在不同情形(单聊、小群群聊和大群群聊)下各有利弊,通常的做法是两种方式结合。 |
16
laravel OP @hst001 我主要想解决的是推送时候 cpu100%的问题,同时也要保证消息及时到达。怎么做才能让推送的压力不大啊?我能想到的就是排队了,但是排队我怎么知道系统负载情况?
|
17
laravel OP 就是怎么才能充分利用 cpu,而又不让他卡死呢?
我觉得肯定是需要一个队列排队的,那我怎么知道每次取多少消息能充分利用 cpu ? 只能通过经验反复测试了吧? |
18
hst001 2020-04-09 16:02:34 +08:00
@laravel #16 那你到底是推送的问题还是存储的问题,我理解错了吗。。。
推送端的话,消息队列本来就帮两端减轻了负载,如果你使用了队列,推送服务还是满负载运行,而队列又在不断堆积,我觉得你应该考虑扩容服务了。 |
20
useben 2020-04-09 18:40:35 +08:00
@laravel cpu 100%? 比如使用消息队列用 kafka, 消费者 pull 模式, 消费速度是视消费者的消费速度来拉取消息的. 这样起一个定时器, 定时拉取就行了吧, 怎么会打满 cpu
|
21
lewinlan 2020-04-10 09:44:04 +08:00 via Android
在发送出口做个限流器呗。1 秒内的消息都 append 进一个 slice 里,用一个 Ticker 计时发送并清空 slice,很简单吧
|
22
vvmint233 2020-04-10 10:24:20 +08:00
或许在广播的前面加上一层缓冲区, 达到一定大小就清空缓冲区然后分发出去
|
23
zz554952942 2020-04-10 10:39:17 +08:00
聊天数据发送到 kafka,或者 RabbitMQ(确保数据不丢失)
然后启一个携程, time.tick 定时 1s 从消息队列取即可 |
24
zz554952942 2020-04-10 10:43:44 +08:00
亦或者可以用 Redis, 以用户 ID 做哈希键 然后当接收到消息的时候则把需要推送的用户 ID 的信息写到 Redis 上(这样子就把当个用户所需要的消息进行了合并)
然后携程定时从 Redis 里取所有数据 然后发送(取到的是一个哈希数组就是每个用户应该接收到的消息) |
25
laravel OP 我已经打算 redis 或者用 go 实现 redis 的 list 也行,一次取 100-1000 条合并之后再广播
|