V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
whats
V2EX  ›  程序员

大量查询数据任务,通过 kafka 缓冲后入库,如何解决大任务阻塞小任务的情况。

  •  
  •   whats · 2022-03-25 11:56:37 +08:00 · 2623 次点击
    这是一个创建于 1035 天前的主题,其中的信息可能已经有所发展或是发生改变。
    RT

    每个任务的流程为:查询服务( N 个实例)->kafka 集群( 2 个 topic )->入库服务( N 个实例)->数据库

    目前情况:
    1. 查询服务从不同来源获取数据,任务数量比较多,1 天任务量可以到 10w 级别
    2. 任务查询的数据量差异比较大,有 1 亿条的,少的也就几十条,查询结束前无法提前知道数据量。
    3. 单个任务的查询是分批进行的,比如每查 1w 条提交一次给 kafka 的 2 个 topic (少于 1w 条写 topic1,等于 1w 条写 topic2 )。
    4. 入库服务消费 2 个 topic ,写入结果库。

    遇到的问题:
    1. 当有大任务写了大量数据到 kafka 后,一些中小型任务被阻塞,需等大任务入库完成才能入库。

    想请教各位大神这类场景有什么策略解决大任务阻塞小任务入库的问题,或者有没其他 mq 替换 kafka ,支持创建大量 topic ,每个任务对应一个 topic ,且不影响 mq 性能。
    20 条回复    2022-03-25 23:47:27 +08:00
    q1angch0u
        1
    q1angch0u  
       2022-03-25 11:57:52 +08:00 via iPhone
    优先级队列
    小任务->高优先级
    大任务->低优先级
    q1angch0u
        2
    q1angch0u  
       2022-03-25 11:58:41 +08:00 via iPhone
    无视我
    q1angch0u
        3
    q1angch0u  
       2022-03-25 12:00:38 +08:00 via iPhone
    不对,我没理解
    [想请教各位大神这类场景有什么策略解决大任务阻塞小任务入库的问题,或者有没其他 mq 替换 kafka ,支持创建大量 topic ,每个任务对应一个 topic ,且不影响 mq 性能。]
    每个任务对应一个 topic 的目的是什么呢?
    whats
        4
    whats  
    OP
       2022-03-25 12:05:28 +08:00
    @q1angch0u 这样任务之间的影响相对较少 每个任务在查询环节创建 taskid 为名称的 topic ,并通知入库服务去消费该 topic , 任务完成后可以删除该 topic
    q1angch0u
        5
    q1angch0u  
       2022-03-25 12:10:59 +08:00
    @whats kafka 有这么用的吗……
    lxz6597863
        6
    lxz6597863  
       2022-03-25 12:28:24 +08:00
    for {
    task = get_task_form_small_topic()
    if task {
    do(task)
    continue
    }
    task = get_task_form_big_topic()
    do(task)
    }

    你已经分大小任务 topic 了,这样不行吗?
    gejigeji
        7
    gejigeji  
       2022-03-25 14:11:01 +08:00
    小的优先级高的新建到不同 topic , 让分一些入库服务专门消费这些 topic
    season8
        8
    season8  
       2022-03-25 14:29:15 +08:00
    你这个查询做两件事:
    1. 查到不同源的数据入库。
    2. 返回数据条数。

    这样用 kafka 确实有点奇怪

    1. 不能直接计算条数,然后写 kafka 么?
    2. 能不能实时从数据库取出条数,写了几条就能拿到几条。
    3. 数据预查预写
    sss495088732
        9
    sss495088732  
       2022-03-25 14:35:25 +08:00
    加 topic 成本最低.
    pengtdyd
        10
    pengtdyd  
       2022-03-25 14:55:28 +08:00
    为什么会阻塞?大概率是大批量写入占用了磁盘 iIO

    仅供参考:
    1.挂载多个磁盘,然后配置多个 data 目录
    2.topic 分区

    ps: 1.我不理解什么叫”并通知入库服务去消费该 topic “这句话,topic 有数据就自动消费就好了 offset 会自动记录,为啥要通知??
    2.奇葩场景、奇葩用法
    dongtingyue
        11
    dongtingyue  
       2022-03-25 17:08:55 +08:00
    槽点太多。。。。
    大任务大部分都是大于 1w 的都是到 tp2 ,小任务到 tp1 ,消费从 tp1 和 tp2 消费怎么可能会被大任务阻塞?
    SbloodyS
        12
    SbloodyS  
       2022-03-25 17:10:53 +08:00
    1 亿的数据对于 kafka 来说挺小的吧....10 亿一天都没多少,没理解为什么要这么用
    paradoxs
        13
    paradoxs  
       2022-03-25 17:18:37 +08:00
    我很好奇,你现在这样设计,那用户多久才能得到查询结果?

    到底是什么业务场景?
    liyunlong41
        14
    liyunlong41  
       2022-03-25 17:20:12 +08:00 via iPhone
    给所有任务的入库设置超时时间,超时的任务 cancel 掉,超时的视为大任务,大任务单独放到重试队列里,这样不会阻塞中小任务。
    java253738191
        15
    java253738191  
       2022-03-25 20:05:20 +08:00
    rabbitmq 不是支持优先级队列吗?
    Jooooooooo
        16
    Jooooooooo  
       2022-03-25 21:58:46 +08:00
    分开消费呗.
    leafre
        17
    leafre  
       2022-03-25 22:21:23 +08:00
    分批生产到两个 topic ,不同消费者组消费,怎么会阻塞呢,我想到的原因就 broker 性能实在太差,或者根本没形成 Cluster
    jhdxr
        18
    jhdxr  
       2022-03-25 22:48:42 +08:00
    『需等大任务入库完成才能入库』

    你这儿的入库指的是你 kafaka 后面的那个消费者(入库服务)吗?那既然这样为何不增加一些消费者的实例?
    zmal
        19
    zmal  
       2022-03-25 23:25:10 +08:00
    1. "1w 条提交给 topic1,小于 1w 条提交给 topic2",分两个 topic 是有业务含义吗?如果没有的话这个逻辑是很有问题的。
    2. "1w 条写 topic"是在 kafka 里写了 1w 条,还是写了一条,里边放了 1w 条数据?如果是后者,这样做是非常有问题的。
    3. 你的 kafka 集群有多大?该 topic 分区有多少?听起来好像只有一个分区?有分区逻辑吗?比如你完全可以根据数据量写到不同的分区,而不是创建两个 topic 。
    4.至于你说的“每个任务对应一个 topic ,用完删除”的想法更是有点异想天开。首先 kafka 已经是吞吐量最高的 mq ,且吞吐量不受存储数据量影响。其次,“用完删除”没有意义,高性能的分布式数据库 mq 等基本都是标记删除,而且高吞吐 mq 的数据都是放在磁盘的,删除 topic 只是个手动腾出磁盘空间的操作,有什么意义?

    总之你遇到的问题不是 kafka 的问题,也不是换个 mq 可以解决的问题。看看 kafka 的性能瓶颈在哪里,可能是分区、网络 io 、内存之类的。
    joesonw
        20
    joesonw  
       2022-03-25 23:47:27 +08:00 via iPhone
    请使用消息队列。kafka 适合有序消息流处理。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   955 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 19:11 · PVG 03:11 · LAX 11:11 · JFK 14:11
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.