V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
nkssai
V2EX  ›  问与答

rabbitmq+pika 中,每个消费者获取消息数量的问题。

  •  
  •   nkssai · 2016-03-30 15:06:15 +08:00 · 6901 次点击
    这是一个创建于 3193 天前的主题,其中的信息可能已经有所发展或是发生改变。

    环境 pika 0.10 RabbitMQ 3.5.4, Erlang 18.0

    生产者

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    queue_name="route_test"
    
    channel.exchange_declare(exchange='logs')
    channel.basic_qos(prefetch_count=1)
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    i = 0
    while i < 10:
        channel.basic_publish(exchange='logs',
                              routing_key=queue_name,
                              body=message,
                              properties=pika.BasicProperties(
                                  delivery_mode=2
                              ))
        i += 1
    
    print(" [x] Sent %r" % message)
    connection.close()
    

    消费者

    def on_start():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
        channel = connection.channel()
    
        queue_name="route_test"
        channel.exchange_declare(exchange='logs')
    
        channel.queue_declare(queue=queue_name)
        channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
        return channel
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
        time.sleep(3)
    
    while True:
        try:
            channel = on_start()
            print(' [*] Waiting for logs. To exit press CTRL+C')
            channel.start_consuming()
        except:
            print "connect error"
    

    我已经在 channel 中设置了channel.basic_qos(prefetch_count=1),但是我如果先执行消费者,然后再执行生产者,就会一次性的把所有的消息都扔给消费者,这样,我如果手动再启动一个消费者,就无法获得还没有被执行的消息了。

    请问,这是为什么,或者怎么样能够达到我想要的效果,即每次消费者都只获得一个消息,剩下的都保存在消息队列里面。

    2 条回复    2016-03-30 17:04:07 +08:00
    knktc
        1
    knktc  
       2016-03-30 15:12:08 +08:00   ❤️ 1
    把 no_ack 设置为 false ,然后取一条消息 ack 一次
    anexplore
        2
    anexplore  
       2016-03-30 17:04:07 +08:00
    1 楼 ok
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5207 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 09:34 · PVG 17:34 · LAX 01:34 · JFK 04:34
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.