NSQ 소스 노트
17097 단어 Go 오픈 소스 프레임 워 크 소스 코드 통독
2. nsqd 는 topic, channel 에 관 한 정 보 를 로 컬 디스크 에 쓰 고 시작 할 때 파일 에서 메모리 로 가장 합 니 다.
3. 메 시 지 는 메모리 에 저장 되 어 있 으 며, 배열 로 이 루어 진 작은 루트 더 미 를 저장 하고, 더미 의 만 료 시간 이 가장 적다.
4. 각 topic 은 topic 의 memory MsgChan 을 소비 하기 위해 goroutine 을 엽 니 다. 이 topic 아래 에서 메 시 지 를 받 을 때마다 이 메시지 (N 부 복사) 를 topic 아래 의 모든 channel 에 보 냅 니 다.
5. 하나의 channel 에 여러 개의 consumer 가 있 는데 어떻게 무 작위 로 메 시 지 를 consumer 에 게 나 누 어 주 는 지, 한 가지 소식 이 있 을 때 한 consumer 만 chan 안의 소식 을 얻 을 수 있 는 지, 이것 은 무 작위 라 고 볼 수 있다.
1. ( ), 。
2.
3.
4.
"go-svc"
-> windows&&linux
->opts := nsqd.NewOptions()
->nsqd, err := nsqd.New(opts)
->p.nsqd.LoadMetadata()
->" topic,channel "
->"nsqd.dat"
->topic1:[channel1,channel2,...]
->topic2:[channel3,channel4,...]
->IsValidTopicName()
->GetTopic(topicName)
->lookupdHTTPAddrs := n.lookupdHTTPAddrs()
->channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
->endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic))
->err := c.client.GETV1(endpoint, &resp)
->channels = append(channels, resp.Channels...)
->channelNames = ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
->p.nsqd.PersistMetadata()
-> topic,channel
-> go func p.nsqd.Main()
->n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
})
->tcpServer := &tcpServer{ctx: ctx}
->for {} ->clientConn, err := listener.Accept()
->go handler.Handle(clientConn) " , goroutine"
->func (p *tcpServer) Handle()
-> 4 protocolV2
-func (p *protocolV2) IOLoop()
->clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) " , 1"
->client := newClientV2(clientID, conn, p.ctx)
->go p.messagePump(client, messagePumpStartedChan)
->subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
->c.addToInFlightPQ(msg)
->" "
->" "
->" : "
->err = p.SendMessage(client, msg)
->response, err = p.Exec(client, params)
->params[0] = ,"PUB" / "SUB" / "TOUCH" / ... / ...
-> "PUB" -> " topic memoryMsgChan"
->params[1] = topicName
->topic := p.ctx.nsqd.GetTopic(topicName)
->t = NewTopic(topicName, &context{n}, deleteCallback)
->" topic goroutine topic memoryMsgChan"
->" topic channel"
->t.waitGroup.Wrap(t.messagePump)
->func (t *Topic) messagePump()
->" topic channel"
->" topic , topic channel"
->" message topic chan, channel chan"
->for { case msg = for channel
->chanMsg = NewMessage(msg.ID, msg.Body)
->err := channel.PutMessage(chanMsg)
->c.memoryMsgChan
->" channel memoryMsgChan ?"
-> sub consumer
->"SUB"
->" client sub " -> " client channel"
->func (p *protocolV2) SUB(client *clientV2, params [][]byte)
->topic := p.ctx.nsqd.GetTopic(topicName)
->channel = topic.GetChannel(channelName)
->client.Channel = channel
->client.SubEventChan memoryMsgChan = subChannel.memoryMsgChan
->msg := " channel consumer, consumer"
->" consumer memoryMsgChan"
->" , consumer chan , 。"
->subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
->c.addToInFlightPQ(msg)
->c.inFlightPQ.Push(msg)
->err = p.SendMessage(client, msg)
->func (p *protocolV2) FIN(client *clientV2, params [][]byte)
->id, err := getMessageID(params[1])
->err = client.Channel.FinishMessage(client.ID, *id)
->c.removeFromInFlightPQ(msg)
->" : topic memoryMsgChan channel memoryMsgChan"
->msg := NewMessage(topic.GenerateID(), messageBody)
->err = topic.PutMessage(msg)
->err := t.put(m)
->t.memoryMsgChan ( channel) ,
->client.PublishedMessage(topicName, 1)
->err = p.Send(client, frameTypeResponse, response)
->n.waitGroup.Wrap(n.queueScanLoop)
->channels := n.channels()
->returns a flat slice of all channels in all topics
->n.resizePool(len(channels), workCh, responseCh, closeCh)
-> worker worker, goroutine, 4
->queueScanWorker = 4
->n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
->c := c.processInFlightQueue(now)
->msg, _ := c.inFlightPQ.PeekAndShift(t)
->x := (*pq)[0]
->pq.Pop()
->c.popInFlightMessage(msg.clientID, msg.ID)
->c.put(msg)
->c.memoryMsgChan c.processDeferredQueue(now)
->n.waitGroup.Wrap(n.lookupLoop)
-> NSQLookupd
->lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname))
->&Command{[]byte("IDENTIFY"), nil, body}, nil
->err = json.Unmarshal(resp, &lp.Info)
->"peerInfo contains metadata for a lookupPeer instance "
->"tcp_port"
->"http_port"
->"broadcast_address"
->"broadcast_address:http_port" -> ,nsqd topic,channel
->" TCPAddresses"
->for _, topic := range n.topicMap
->commands = append(commands, nsq.Register(channel.topicName, channel.name))
->&Command{[]byte("REGISTER"), params, nil}
->lookupPeers = append(lookupPeers, lookupPeer)
->n.lookupPeers.Store(lookupPeers)
->" Store"
->" nsqd topic,channel addr, lookupd 。"
->case " lookupd, ping"
->&Command{[]byte("PING"), nil, nil}
->" nsqLookupd PING, 。"
->case val := " "
->&Command{[]byte("UNREGISTER"), params, nil}
->" "
-> &Command{[]byte("REGISTER"), params, nil}
->func (c *Channel) put(m *Message) error
->case c.memoryMsgChan default
->b := bufferPoolGet()
->sync.Pool
->" "
->" , , CG 。"
->err := writeMessageToBackend(b, m, c.backend)
" "
"to_nsq"
->producer, err := nsq.NewProducer(addr, cfg)
->producer.Publish(*topic, line)
->w.sendCommand(Publish(topic, body))
->params = [][]byte{[]byte(topic)}
->&Command{[]byte("PUB"), params, body}
->doneChan := make(chan *ProducerTransaction)
->err := w.sendCommandAsync(cmd, doneChan, nil)
->" "
->" doneChan , "
->if atomic.LoadInt32(&w.state) != StateConnected
->" , TCP "
->" , "
->" "
->err := w.connect()
->w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
->w.conn.Connect()
->conn, err := dialer.Dial("tcp", c.addr)
->c.conn = conn.(*net.TCPConn)
->c.r = conn
->c.w = conn
->c.identify()
->ci["client_id"] = c.config.ClientID
->ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
->cmd, err := Identify(ci)
->&Command{[]byte("IDENTIFY"), nil, body}
->err = c.WriteCommand(cmd)
->c.maxRdyCount = resp.MaxRdyCount
->go c.readLoop()
->delegate := &connMessageDelegate{c}
->for{}
->frameType, data, err := ReadUnpackedResponse(c)
->" 4 byte frame ID, N byte data"
->FrameTypeResponse
->c.delegate.OnResponse(c, data)
->w.responseChan FrameTypeError
->FrameTypeMessage
->msg, err := DecodeMessage(data)
->msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
->msg.Attempts = binary.BigEndian.Uint16(b[8:10])
->copy(msg.ID[:], b[10:10+MsgIDLength])
->msg.Body = b[10+MsgIDLength:]
->msg.Delegate = delegate
->msg.NSQDAddress = c.String()
->go c.writeLoop()
->for{}
->select
->case cmd := c.WriteCommand(cmd)
->case resp := atomic.StoreInt32(&w.state, StateConnected)
->go w.router()
->for{}
->select
->case t := w.transactions = append(w.transactions, t)
->err := w.conn.WriteCommand(t.cmd)
->case data := w.popTransaction(FrameTypeResponse, data)
->t.doneChan "t.doneChan " doneChan "
->t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
->w.transactionChan router()
->w.transactions = append(w.transactions, t)
->" publish response "
->" "
->" , , , , 。"
->func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan case data := w.popTransaction(FrameTypeResponse, data)
->t := w.transactions[0]
->" , cmd cmd"
->" , cmd cmd"
->w.transactions = w.transactions[1:]
->t.finish()
->t.doneChan case data := err := w.conn.WriteCommand(t.cmd)
->_, err := cmd.WriteTo(c)
->t := consumer, err := nsq.NewConsumer(topic, *channel, cCfg)
->go r.rdyLoop()
->
->" "
->"redistributing max-in-flight to connections"
->
->len(conns) > int(maxInFlight)
->r.inBackoff() && len(conns) > 1
->" "
->" , ,rdy 0"
->" 0 rdy , ,rdy 0"
->" conn, rdy 1"
->consumer.AddConcurrentHandlers(topicHandler, len(destNsqdTCPAddrs))
->" : message"
->" : "
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
->for{}
->message, ok := err := handler.HandleMessage(message) -> if err != nil
->" , "
->"sends a REQ command to the nsqd"
->message.Requeue(-1)
->m.doRequeue(delay, true)
->m.Delegate.OnRequeue(m, delay, backoff)
->c.msgResponseChan &Command{[]byte("REQ"), params, nil}
-> if err == nil
->" , "
->message.Finish()
->c.msgResponseChan &Command{[]byte("FIN"), params, nil}
->msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
->err := consumer.ConnectToNSQDs(nsqdTCPAddrs)
->" addr"
->func (r *Consumer) ConnectToNSQD(addr string)
->conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
->resp, err := conn.Connect()
->conn, err := dialer.Dial("tcp", c.addr)
->c.conn = conn.(*net.TCPConn)
->c.r = conn
->c.w = conn
->go c.readLoop()
->" 4 byte frameID, N byte Data"
-> _heartbeat_, nop
->&Command{[]byte("NOP"), nil, nil}
-> CLOSE_WAIT, StartClose
->c.delegate.OnMessage(c, msg)
->r.incomingMessages atomic.AddInt64(&c.messagesInFlight, 1) " "
->go c.writeLoop()
->case resp := "FIN" ->" " -> resumeFlag
->r.startStopContinueBackoff(c, resumeFlag)
->backoffCounter--
-> "REQ" -> " "
->" backoff" -> backoffFlag
->r.startStopContinueBackoff(c, backoffFlag)
->backoffCounter++
->nextBackoff := math.Pow(2, float64(attempt))
->" continue"
->
->backoffCounter == 0
-> " backoff"
->backoffCounter > 0 -> " , "
->"send RDY 0 immediately (to *all* connections)"
->r.updateRDY(c, 0)
->" "
->backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter))
->" backoffDuration "
->time.AfterFunc(d, r.resume)
->r.updateRDY(choice, 1)
->cmd := Subscribe(r.topic, r.channel)
->&Command{[]byte("SUB"), params, nil}
->for _, c := range r.conns() {}
->r.maybeUpdateRDY(c)
->count := r.perConnMaxInFlight()
->r.updateRDY(conn, count)
->c.maxRdyCount = resp.maxRdyCount
->"client nsqd N "
->" "
->c.rdyCount
->" "
->maxPossibleRdy := int64(r.getMaxInFlight()) - atomic.LoadInt64(&r.totalRdyCount) + rdyCount
->" ?"
->" atomic.LoadInt64(&r.totalRdyCount) + (maxPossibleRdy - rdyCount) <= getMaxInFlight "
->r.sendRDY(c, count)
->atomic.AddInt64(&r.totalRdyCount, count-c.RDY())
->""
->c.SetRDY(count)
->err := c.WriteCommand(Ready(int(count)))
->&Command{[]byte("RDY"), params, nil}
->err := consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
->" addr"
->go r.lookupdLoop()
->r.queryLookupd()
->"make an HTTP req to one of the configured nsqlookupd instances to discover"
->"which nsqd's provide the topic we are consuming"
->"http://...//topic"
->err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
->broadcastAddress := producer.BroadcastAddress
->port := producer.TCPPort
->joined := "broadcastAddress:port"
"nsqlookupd"
->for{}->clientConn, err := listener.Accept()
->go handler.Handle(clientConn)
->func (p *LookupProtocolV1) IOLoop()
->response, err = p.Exec(client, reader, params)
->"PING"
->" nsqd "
->" , "
->InactiveProducerTimeout: 300 * time.Second, " 300s"
->" , nsqd ,nsqd UNREGISTER ,300s nsqlookupd "
->Handle("GET", "/lookup")
->"IDENTIFY"
->peerInfo.RemoteAddress = client.RemoteAddr().String()
->client.peerInfo = &peerInfo
->p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo})
->response = tcp_port ...
->"REGISTER"
->"client must IDENTIFY"
->topic, channel = params[0],params[1]
->if channel != ""
->key := Registration{"channel", topic, channel}
->p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
->key := Registration{"topic", topic, ""}
->p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
->" ,"
->"UNREGISTER"
->topic, channel, err := getTopicChan("UNREGISTER", params)
->" ,nsqlookupd , ?"
->" ? nsqd ,nsqd unregister cmd"
-> nsqd
->func (c *Channel) exit()
->func (t * topic) exit()
->router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
->router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
->topicName, err := reqParams.Get("topic")
->key := Registration{"topic", topicName, ""}
->s.ctx.nsqlookupd.DB.AddRegistration(key)
->router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
->registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
->" topic channel"
->" topic"
->router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
->key := Registration{"channel", topicName, channelName}
->s.ctx.nsqlookupd.DB.AddRegistration(key)
->key = Registration{"topic", topicName, ""}
->s.ctx.nsqlookupd.DB.AddRegistration(key)