V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
git00ll
V2EX  ›  Java

reactor.io 使用 Flux.create 创建 Flux 时,需要注意 consumer 会被多次调用,解决起来挺麻烦的

  •  
  •   git00ll · 2021-08-02 11:44:54 +08:00 · 1264 次点击
    这是一个创建于 1243 天前的主题,其中的信息可能已经有所发展或是发生改变。

    创建 flux 代码如下,其中的 long consumer 可能会被下游多次调用。

            Flux.create(new Consumer<FluxSink<Object>>() {
                @Override
                public void accept(FluxSink<Object> fluxSink) {
                    fluxSink.onRequest(new LongConsumer() {
                        @Override
                        public void accept(long value) {
                            log.info("我被多次调用了 request:" + value);
                            for (long i = 0; i < value; i++) {
                                fluxSink.next("request:" + i);
                            }
                        }
                    });
                }
            })
    
    

    也就是说,我们不能决定下游调用的时机,调用的次数,调用的所在线程。这样就很容易产生 bug 。


    FluxArray 解决此问题的办法是使用 Operators.addCap(REQUESTED, this, n) == 0判断, 只有返回为 0 时,才进行处理,否则将请求的 n 叠加到 request 后就 return 。

    		public void request(long n) {
    			if (Operators.validate(n)) {
    				if (Operators.addCap(REQUESTED, this, n) == 0) {
    					if (n == Long.MAX_VALUE) {
    						fastPath();
    					}
    					else {
    						slowPath(n);
    					}
    				}
    			}
    		}
    

    我们自己写 Flux.create() 时也可以借鉴 FluxArray 的处理办法,但是这样就变得麻烦了。 不知道有什么现有封装好的实现没有??

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5662 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 06:30 · PVG 14:30 · LAX 22:30 · JFK 01:30
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.