V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
thomaswang
V2EX  ›  问与答

golang nsq 源码哪一段表明 msg 会投递到 Topic 下所有的 Channel

  •  
  •   thomaswang · 2018-09-25 21:26:57 +08:00 · 920 次点击
    这是一个创建于 2040 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我咋找不到呢

    func (t *Topic) PutMessage(m *Message) error {
    	t.RLock()
    	defer t.RUnlock()
    	if atomic.LoadInt32(&t.exitFlag) == 1 {
    		return errors.New("exiting")
    	}
    	err := t.put(m)
    	if err != nil {
    		return err
    	}
    	atomic.AddUint64(&t.messageCount, 1)
    	return nil
    }
    
    func (t *Topic) put(m *Message) error {
    	select {
    	case t.memoryMsgChan <- m:
    	default:
    		b := bufferPoolGet()
    		err := writeMessageToBackend(b, m, t.backend)
    		bufferPoolPut(b)
    		t.ctx.nsqd.SetHealth(err)
    		if err != nil {
    			t.ctx.nsqd.logf(LOG_ERROR,
    				"TOPIC(%s) ERROR: failed to write message to backend - %s",
    				t.name, err)
    			return err
    		}
    	}
    	return nil
    }
    
    1 条回复    2018-09-25 22:11:02 +08:00
    thomaswang
        1
    thomaswang  
    OP
       2018-09-25 22:11:02 +08:00
    ```go
    func (t *Topic) messagePump() {
    for i, channel := range chans {
    chanMsg := msg
    // copy the message because each channel
    // needs a unique instance but...
    // fastpath to avoid copy if its the first channel
    // (the topic already created the first copy)
    if i > 0 {
    chanMsg = NewMessage(msg.ID, msg.Body)
    chanMsg.Timestamp = msg.Timestamp
    chanMsg.deferred = msg.deferred
    }
    if chanMsg.deferred != 0 {
    channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
    continue
    }
    err := channel.PutMessage(chanMsg)
    if err != nil {
    t.ctx.nsqd.logf(LOG_ERROR,
    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
    t.name, msg.ID, channel.name, err)
    }
    }
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2788 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 03:54 · PVG 11:54 · LAX 20:54 · JFK 23:54
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.