V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
yesterdaysun
V2EX  ›  Java

求助 Java 大量任务分布式处理的问题

  •  1
     
  •   yesterdaysun · 2022-04-26 12:30:59 +08:00 · 4848 次点击
    这是一个创建于 1003 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题是这样, 现在系统中有大量去和第三方 API 交互的任务, 比如有 1000 个用户, 每个用户又有各自 1 万个小的记录去和第三方 API 慢慢交互, 或者没有那么多记录但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络, 之前的方式就是一个线程池, 把所有大小任务塞进去, 但是这个线程池大小很难搞, 多了的话, 有时会突然来一堆任务占住 CPU 和数据库, 少了的话, 一大堆任务又阻塞住.

    现在想搞成分布式好几台机器一起跑, 考察了一下方案, 有点迷惑:

    1. 一种是分布式任务队列, 看到一个 Celery 好像是这种, 但是这个 python 的, 我想要 Java 的, 结果没找到
    2. 一种是任务调度框架, quartz, xxljob 这种, 感觉我想要的更靠近这种, 但是又有点迷惑, 比如感觉我这种需求适合"分片广播"这种任务, 比如我把 1000 个用户的任务分片到 3 台机器, 但是然后每台机器上的任务为每个用户再单独为他名下的 1 万条记录自己做线程池请求? 或者我把任务拆到单个小记录的级别, 那岂不是得成千上万的 trigger, 然后任务调度又一般是一个主 job, 然后传参数这种, 那比如我要确保一个时间只有一个用户的任务在跑, 怎么做这个限制, 全要自己在任务中处理吗

    所以, 其实就是我想找一个比较现成的框架, 能处理超长的任务队列, 分布式, 并发的执行, 可以自动削峰填谷, 有一些任务自动处理, 比如重试, 故障转移等等, 又能够有一些保证一致性的机制, 比如按 job+某个参数确保不会重复执行, 还能程序方式发起调度, 而不是在某个管理后台手动编辑

    我想知道这样的东西存在吗, 还是必须自己实现, 求各位大佬赐教

    31 条回复    2022-04-28 15:37:50 +08:00
    aguesuka
        1
    aguesuka  
       2022-04-26 12:33:50 +08:00
    storm
    biubiuF
        2
    biubiuF  
       2022-04-26 12:40:35 +08:00
    你需要 kafka ,把你现在的 jobs 弄成消费者
    RedBeanIce
        3
    RedBeanIce  
       2022-04-26 12:59:38 +08:00
    可能是 xxjob ????我记得有分片处理,,,就是一堆小任务,大家去处理
    bthulu
        4
    bthulu  
       2022-04-26 13:06:09 +08:00
    既然时间都消耗在网络 IO 上, 上 windows 系统, 用 IOCP 去调接口, 单机就能搞定了, 用不着搞这么多骚操作
    jorneyr
        5
    jorneyr  
       2022-04-26 13:09:21 +08:00
    @biubiuF Kafka 的每个 partition 一个消费者组里同时只能有一个消费者进行消费,这种情况我觉得 RabbitMQ 可能更合适,不必明确的限制消费者个数,看情况随时动态增减消费者,每个消息可以使用阻塞的方式执行。
    Leexiaobu
        6
    Leexiaobu  
       2022-04-26 14:11:28 +08:00
    Akka
    lmshl
        7
    lmshl  
       2022-04-26 14:15:32 +08:00
    改异步纤程,你这才一千万个 IO 小任务,犯不着上分布式。Akka Stream (调度) + Akka HTTP (调 API ) 随便搞一搞单机就完事了
    ming159
        8
    ming159  
       2022-04-26 14:16:34 +08:00
    如你所说:“其实时间都是消耗在网络 IO 上” 线程是不解决 IO 问题的,你需要的是 异步 IO 处理机制。一个线程同时处理多个 IO ,而不是一个线程处理一个 IO 。
    ymmud
        9
    ymmud  
       2022-04-26 14:25:24 +08:00
    akka cluster sharding , 根据需求分片就行了
    lmshl
        10
    lmshl  
       2022-04-26 14:25:58 +08:00
    我写过一个
    所有 fiber 去数据库查任务状态,select * from tasks where state = 'todo',然后执行这一批任务,更新任务状态。
    最后并行 128 同时跑所有 fiber
    zmal
        11
    zmal  
       2022-04-26 14:57:11 +08:00
    需求场景是两个问题:
    1. 是否要把这部分逻辑从主系统解耦出来。
    2. 怎样加快这部分业务的处理速度,减少资源占用,包括但不限于可以任意扩容的分布式、异步 IO 等等。
    如果是我的话,个人对 Flink 比较熟悉,可能会选择解耦后用 Flink 来处理,Flink 解决了分布式、一致性容错等问题。
    akka 解决的是异步 io 并发量问题,楼上 akka 的方案应该也是可行的。看你对哪个工具比较熟悉了。
    git00ll
        12
    git00ll  
       2022-04-26 15:26:13 +08:00
    `但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络`

    这句话不明白,啥接口要耗时 10 分钟? 等网络是什么意思。如果接口一次请求响应要 10 分钟,多开点线程如 200-300 个,网络堵塞的时候是不会大量占用 cpu 的。关键如果接口能否承受这么高并发数。
    5boy
        13
    5boy  
       2022-04-26 16:12:24 +08:00
    mark, 有没有不用大数据框架实现的方式?
    litchinn
        14
    litchinn  
       2022-04-26 16:24:53 +08:00
    /t/848357 ,隔壁刚提出的这个动态线程池不知道能不能实现这个需求。另外你说线程池大小不好调,换成分布式多个机器跑,那节点数量不是一样需要调整吗,k8s 弹性伸缩?
    misaka19000
        15
    misaka19000  
       2022-04-26 16:55:55 +08:00
    用协程或者异步 IO
    Saurichthys
        16
    Saurichthys  
       2022-04-26 16:58:49 +08:00
    不要用 xxl-job 的方案,基于数据库,性能不佳,莫名其妙问题很多
    yesterdaysun
        17
    yesterdaysun  
    OP
       2022-04-26 17:56:23 +08:00
    @git00ll 说的不清楚, 其实是一个长流程, 比如请求一个报告, 但是不会立即返回, 需要等第三方处理好, 才能拿到, 中间就每隔 1-2 分钟去轮询一次看看报告有没有好, 通常都要 10 分钟左右, 关键不是每种任务都是这样的, 如果单为它建一个线程池又感觉有点过了, 想搞个通用的解法

    上面的我都研究了一下, 我这个系统比较简单, 本身就是个单体, 并不是分布式的, 这次也只是想要把这个后台任务独立出去搞多机并行, 感觉我这个还不到动用 akka/协程之类的方案的地步, 应该还是简单点, 一个简单的调度系统加动态线程池就足够了, 美团开源的那个动态线程池看上去比较适合, 我先研究一下试试看
    polarbear007
        18
    polarbear007  
       2022-04-26 17:56:55 +08:00
    个人认为使用非阻塞 io 即可
    jekkro
        19
    jekkro  
       2022-04-26 18:17:50 +08:00
    用 redis 实现异步队列即可,一个进程专门负责插入任务到 Redis 队列中,另外几个负责从队列中获取信息并执行,完成后更新数据库里的状态。如果发生 Redis 所在的机器 down 机,则负责插入任务的那个进程重新把没有完成的再插入一遍(不过这个目前为止还没有发生过)。我有类似的业务,已经跑了 12 年了。
    另外因为 Redis 有各种复杂数据结构,可以满足延时队列,优先级队列,自动去重等功能。感觉性能优秀,代码简单。
    jekkro
        20
    jekkro  
       2022-04-26 18:21:16 +08:00
    不能用非阻塞 io 的原因一般是因为那些接口库不是自己实现的,没办法去改造那些接口底层库,虽然 http 的接口自己也可以实现,但是有些场景(比如各种开放平台的接口库)不可能把第三方提供的接口库重新写一边,而仅仅是为了解决阻塞 io 的问题。
    lmshl
        21
    lmshl  
       2022-04-26 18:28:37 +08:00
    @yesterdaysun 以上技术方案中,综合代码量和开发难度来看,从易到难依次应该是
    纤程 >> Akka Stream > nio-pool > xxjob/scheduler > 动态线程池屎上雕花 >> akka cluster sharding >> akka cluster without sharding

    纤程是真的简单,你这需求 20-50 行左右就完事了,不就是个
    flow = post(...) >> (sleep(1.minutes) *> check(xxx)).retryWhile(isCompleted) >> retrieve()
    然后 tasks.foreachPar(<你想开多大并行>)(flow)
    的事
    outoftimeerror
        22
    outoftimeerror  
       2022-04-26 18:33:29 +08:00
    我也写 java ,不过你这个需求让我选型的话,我会用 golang (goroutine+chan)+ redis
    XhstormR02
        23
    XhstormR02  
       2022-04-26 19:24:21 +08:00 via Android
    @lmshl java 的纤程 Quasar ,最近一次更新是 2018 年,都好多年没更新了 https://github.com/puniverse/quasar ,倒不如用 kotlin 的 coroutines
    https://github.com/Kotlin/kotlinx.coroutines/
    dddd1919
        24
    dddd1919  
       2022-04-26 21:27:40 +08:00
    显然是该上 MQ 了,把用户放到队列,由消费端去挨个处理用户任务,如果单个用户跑的话配一个消费任务就够了

    强业务需求建议 RabbitMQ/RocketMQ
    lmshl
        25
    lmshl  
       2022-04-26 21:41:39 +08:00
    @XhstormR02 反正我说的也不是 Java 🐶
    其实上面写的是 Scala 伪代码 🐶
    档燃,Kotlin 也不错,起码有 suspend/await 可以用,不像 IO Monad 要切换编程思维
    mind3x
        26
    mind3x  
       2022-04-27 02:40:39 +08:00 via Android
    建议看看 Uber 的开源框架 Cadence ,支持 Go 和 Java 。
    https://cadenceworkflow.io

    上手会有一点门槛。
    ymmud
        27
    ymmud  
       2022-04-27 12:02:07 +08:00
    @lmshl 看着有点像 zio
    lmshl
        28
    lmshl  
       2022-04-27 20:37:23 +08:00
    @ymmud 兄弟慧眼👍
    yhvictor
        29
    yhvictor  
       2022-04-27 23:36:07 +08:00 via iPhone
    协程应该满好弄的。
    nio 有点难写。

    但我觉得吧,楼主这工作分两部分,第一部分是网络 io 等待准备好,第二部分是处理数据。
    第一部分是 io 密集,第二部分是 cpu 密集。
    所以如果拆成两部分,第一步就可以线程开满,直到数据准备好,放入一个 queue 。
    第二步开线程约等于或小于 cpu 核心数,从 queue 中读准备好的数据源并处理。
    齐活。
    byte10
        30
    byte10  
       2022-04-28 09:35:05 +08:00
    @polarbear007 是的,一个 NIO 就解决啦,花里胡哨的,想一些错误的方案。

    首先 IO 密集型,线程开到 1000 个都不是问题,线程在 IO 的时候不占用 cpu 。当然可能同时响应时就会出现 cpu 拉满,所以 cpu 使用率就是锯齿形的,不好分析瓶颈。

    你这个核心问题就是 IO 时间不确定,无法确定最大线程数。你可以看我的那个教程,https://www.bilibili.com/video/BV1FS4y1o7QB , NIO 如何无视 IO 时间,解决线程池大小的问题。你千万不要搞分布式,分布式是单机 cpu 性能出现瓶颈才干的事情,你这个场景一个树莓派 4B 就能完成。一定要切记,这些很基本很基本的问题,不要把事情想复杂了,在这一点犯错的人太多了。

    至于多个异步转同步问题,countdownlatch 和 cyclicbarrier 都能很好解决。

    找到最核心的问题,解决核心问题,加油。
    wolfie
        31
    wolfie  
       2022-04-28 15:37:50 +08:00
    做过类似功能,自己实现了一个,参考了 xxl-job 的表结构。

    # some_table
    - id
    - status ( running 、succeed 、failed )

    # some_table_job (频繁扫描表)
    - some_table_id
    - is_running
    - next_run_time (索引字段)
    - last_run_time (索引字段)(几分钟一扫描,防止异常结束,长时间未完成)
    - version (版本号,乐观锁)

    # some_table_job_log
    - some_table_id
    - some_table_job_id
    - result ( succeed 、failed )

    1. 新增 some_table ,同时新增 some_table_job
    2. 定时任务扫描 some_table_job ,拉取任务数据
    3. 任务执行完
    - 成功:写入 some_table_job_log ,删除 some_table_job ,回写 some_table 状态。
    - 失败:写入 some_table_job_log ,计算 some_table_job 下一次执行时间。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   916 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 20:15 · PVG 04:15 · LAX 12:15 · JFK 15:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.