NSQ 소스 노트
                                            
 17097 단어  Go 오픈 소스 프레임 워 크 소스 코드 통독
                    
2. nsqd 는 topic, channel 에 관 한 정 보 를 로 컬 디스크 에 쓰 고 시작 할 때 파일 에서 메모리 로 가장 합 니 다.
3. 메 시 지 는 메모리 에 저장 되 어 있 으 며, 배열 로 이 루어 진 작은 루트 더 미 를 저장 하고, 더미 의 만 료 시간 이 가장 적다.
4. 각 topic 은 topic 의 memory MsgChan 을 소비 하기 위해 goroutine 을 엽 니 다. 이 topic 아래 에서 메 시 지 를 받 을 때마다 이 메시지 (N 부 복사) 를 topic 아래 의 모든 channel 에 보 냅 니 다.
5. 하나의 channel 에 여러 개의 consumer 가 있 는데 어떻게 무 작위 로 메 시 지 를 consumer 에 게 나 누 어 주 는 지, 한 가지 소식 이 있 을 때 한 consumer 만 chan 안의 소식 을 얻 을 수 있 는 지, 이것 은 무 작위 라 고 볼 수 있다.
1.       (  ),            。
2.          
3.          
4.               
"go-svc"
	->  windows&&linux
->opts := nsqd.NewOptions()
->nsqd, err := nsqd.New(opts)
->p.nsqd.LoadMetadata()
	->"       topic,channel  "
	->"nsqd.dat"
		->topic1:[channel1,channel2,...]
		->topic2:[channel3,channel4,...]
	->IsValidTopicName()
	->GetTopic(topicName)
		->lookupdHTTPAddrs := n.lookupdHTTPAddrs()
			->channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
				->endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic))
				->err := c.client.GETV1(endpoint, &resp)
				->channels = append(channels, resp.Channels...)
		->channelNames = ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
->p.nsqd.PersistMetadata()
	->   topic,channel       
-> go func p.nsqd.Main()
	->n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
	})
		->tcpServer := &tcpServer{ctx: ctx}
		->for {} ->clientConn, err := listener.Accept()
		->go handler.Handle(clientConn) "      ,   goroutine"
		->func (p *tcpServer) Handle()
			-> 4       protocolV2
			-func (p *protocolV2) IOLoop()
				->clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) "      ,  1"
				->client := newClientV2(clientID, conn, p.ctx)
				->go p.messagePump(client, messagePumpStartedChan)
					->subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
						->c.addToInFlightPQ(msg)
							->"         "
							->"         "
							->"   :       "
					->err = p.SendMessage(client, msg)
				->response, err = p.Exec(client, params)
					->params[0] =         ,"PUB" / "SUB" / "TOUCH" / ... / ... 
					-> "PUB"   -> "      topic memoryMsgChan"
						->params[1] = topicName
						->topic := p.ctx.nsqd.GetTopic(topicName)
							->t = NewTopic(topicName, &context{n}, deleteCallback)
								->"   topic    goroutine   topic memoryMsgChan"
								->" topic          channel"
								->t.waitGroup.Wrap(t.messagePump)
								->func (t *Topic) messagePump()
									->"    topic     channel"
									->"    topic         ,        topic     channel"
									->"  message     topic chan,          channel chan"
									->for { case msg = for     channel
											->chanMsg = NewMessage(msg.ID, msg.Body)
											->err := channel.PutMessage(chanMsg)
											->c.memoryMsgChan 
											->"    channel   memoryMsgChan      ?"
											->         sub   consumer
											->"SUB"
											->"   client  sub  " -> "   client     channel"
											->func (p *protocolV2) SUB(client *clientV2, params [][]byte)
												->topic := p.ctx.nsqd.GetTopic(topicName)
												->channel = topic.GetChannel(channelName)
												->client.Channel = channel
												->client.SubEventChan  memoryMsgChan = subChannel.memoryMsgChan
											->msg := "   channel   consumer,                 consumer"
											->"   consumer       memoryMsgChan"
											->"         ,     consumer   chan     ,          。"
												->subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
													->c.addToInFlightPQ(msg)
														->c.inFlightPQ.Push(msg)
												->err = p.SendMessage(client, msg)
											->func (p *protocolV2) FIN(client *clientV2, params [][]byte)
												->id, err := getMessageID(params[1])
												->err = client.Channel.FinishMessage(client.ID, *id)
												->c.removeFromInFlightPQ(msg)
									->"  :  topic memoryMsgChan channel memoryMsgChan"
									
						->msg := NewMessage(topic.GenerateID(), messageBody)
						->err = topic.PutMessage(msg)
							->err := t.put(m)
								->t.memoryMsgChan    (  channel)  ,    
						->client.PublishedMessage(topicName, 1)
				->err = p.Send(client, frameTypeResponse, response)
	->n.waitGroup.Wrap(n.queueScanLoop)
		->channels := n.channels()
			->returns a flat slice of all channels in all topics
		->n.resizePool(len(channels), workCh, responseCh, closeCh)
			->  worker      worker,   goroutine,  4 
				->queueScanWorker = 4
			->n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
				->c := c.processInFlightQueue(now)
					->msg, _ := c.inFlightPQ.PeekAndShift(t)
						->x := (*pq)[0]
						->pq.Pop()
					->c.popInFlightMessage(msg.clientID, msg.ID)
					->c.put(msg)
						->c.memoryMsgChan c.processDeferredQueue(now)
	->n.waitGroup.Wrap(n.lookupLoop)
		->     NSQLookupd
		->lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
					connectCallback(n, hostname))
					->&Command{[]byte("IDENTIFY"), nil, body}, nil
						->err = json.Unmarshal(resp, &lp.Info)
							->"peerInfo contains metadata for a lookupPeer instance "
							->"tcp_port"
							->"http_port"
							->"broadcast_address"
							->"broadcast_address:http_port" ->        ,nsqd     topic,channel
							->"          TCPAddresses"
					->for _, topic := range n.topicMap
						->commands = append(commands, nsq.Register(channel.topicName, channel.name))
						->&Command{[]byte("REGISTER"), params, nil}
		->lookupPeers = append(lookupPeers, lookupPeer)
		->n.lookupPeers.Store(lookupPeers)
			->"        Store"
			->"  nsqd   topic,channel addr,    lookupd  。"
		->case "    lookupd,    ping"
			->&Command{[]byte("PING"), nil, nil}
			->" nsqLookupd      PING,       。"
		->case val := "    "
				->&Command{[]byte("UNREGISTER"), params, nil}
			->"    "
				-> &Command{[]byte("REGISTER"), params, nil}
->func (c *Channel) put(m *Message) error
	->case c.memoryMsgChan default
	->b := bufferPoolGet()
		->sync.Pool
		->"   "
		->"           ,       ,  CG  。"
	->err := writeMessageToBackend(b, m, c.backend)
"       "
"to_nsq"
->producer, err := nsq.NewProducer(addr, cfg)
->producer.Publish(*topic, line)
	->w.sendCommand(Publish(topic, body))
			->params = [][]byte{[]byte(topic)}
			->&Command{[]byte("PUB"), params, body}
		->doneChan := make(chan *ProducerTransaction)
		->err := w.sendCommandAsync(cmd, doneChan, nil)
			->"      "
			->"   doneChan  ,     "
			->if atomic.LoadInt32(&w.state) != StateConnected 
				->"        ,     TCP  "
				->"   ,   "
				->"    "
			->err := w.connect()
				->w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
				->w.conn.Connect()
					->conn, err := dialer.Dial("tcp", c.addr)
					->c.conn = conn.(*net.TCPConn)
					->c.r = conn
					->c.w = conn
					->c.identify()
						->ci["client_id"] = c.config.ClientID
						->ci["msg_timeout"] = int64(c.config.MsgTimeout / time.Millisecond)
						->cmd, err := Identify(ci) 
							->&Command{[]byte("IDENTIFY"), nil, body}
						->err = c.WriteCommand(cmd)
						->c.maxRdyCount = resp.MaxRdyCount
					->go c.readLoop()
						->delegate := &connMessageDelegate{c}
						->for{}
						->frameType, data, err := ReadUnpackedResponse(c)
							->" 4 byte frame ID,  N byte data"
						->FrameTypeResponse
							->c.delegate.OnResponse(c, data)
								->w.responseChan FrameTypeError
						->FrameTypeMessage
							->msg, err := DecodeMessage(data)
								->msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
								->msg.Attempts = binary.BigEndian.Uint16(b[8:10])
								->copy(msg.ID[:], b[10:10+MsgIDLength])
								->msg.Body = b[10+MsgIDLength:]
							->msg.Delegate = delegate
							->msg.NSQDAddress = c.String()
					->go c.writeLoop()
						->for{}
						->select 
						->case cmd := c.WriteCommand(cmd)
						->case resp := atomic.StoreInt32(&w.state, StateConnected)
				->go w.router()
					->for{}
					->select
					->case t := w.transactions = append(w.transactions, t)
						->err := w.conn.WriteCommand(t.cmd)
					->case data := w.popTransaction(FrameTypeResponse, data)
							->t.doneChan "t.doneChan "         doneChan        "
			->t := &ProducerTransaction{
				cmd:      cmd,
				doneChan: doneChan,
				Args:     args,
				}
			->w.transactionChan   router()   
					->w.transactions = append(w.transactions, t)
						->" publish    response   "
						->"      "
						->"    ,    ,      ,       ,          。"
						->func (w *Producer) onConnResponse(c *Conn, data []byte) { w.responseChan case data := w.popTransaction(FrameTypeResponse, data)
								->t := w.transactions[0]
								->"            ,    cmd        cmd"
								->"      ,   cmd               cmd"
								->w.transactions = w.transactions[1:]
								->t.finish()
									->t.doneChan case data := err := w.conn.WriteCommand(t.cmd)
						->_, err := cmd.WriteTo(c)
		->t := consumer, err := nsq.NewConsumer(topic, *channel, cCfg)
	->go r.rdyLoop()
		->
		->"                "
		->"redistributing max-in-flight to connections"
		->            
			->len(conns) > int(maxInFlight)
			->r.inBackoff() && len(conns) > 1
		->"           "
			->"              ,       ,rdy 0"
			->"    0 rdy    ,       ,rdy 0"
		->"   conn, rdy 1"
->consumer.AddConcurrentHandlers(topicHandler, len(destNsqdTCPAddrs))
	->"     :         message"
	->"     :   "
	for i := 0; i < concurrency; i++ {
		go r.handlerLoop(handler)
	}
	->for{}
	->message, ok := err := handler.HandleMessage(message) -> if err != nil
		->"           ,      "
		->"sends a REQ command to the nsqd"
		->message.Requeue(-1)
			->m.doRequeue(delay, true)
				->m.Delegate.OnRequeue(m, delay, backoff)
				->c.msgResponseChan &Command{[]byte("REQ"), params, nil}
	-> if err == nil
		->"           ,   "
		->message.Finish()
			->c.msgResponseChan &Command{[]byte("FIN"), params, nil}
			->msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
->err := consumer.ConnectToNSQDs(nsqdTCPAddrs)
	->"    addr"
	->func (r *Consumer) ConnectToNSQD(addr string)
		->conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
		->resp, err := conn.Connect()
			->conn, err := dialer.Dial("tcp", c.addr)
			->c.conn = conn.(*net.TCPConn)
			->c.r = conn
			->c.w = conn
			->go c.readLoop()
				->" 4 byte frameID, N byte Data"
				->     _heartbeat_,  nop
					->&Command{[]byte("NOP"), nil, nil}
				->   CLOSE_WAIT,   StartClose   
				->c.delegate.OnMessage(c, msg)
					->r.incomingMessages atomic.AddInt64(&c.messagesInFlight, 1) "        "
			->go c.writeLoop()
				->case resp :=  "FIN" ->"      " -> resumeFlag
					->r.startStopContinueBackoff(c, resumeFlag)
						->backoffCounter--
					-> "REQ" -> "      " 
						->"  backoff" -> backoffFlag
							->r.startStopContinueBackoff(c, backoffFlag)
								->backoffCounter++
								->nextBackoff := math.Pow(2, float64(attempt))
						->"    continue" 
							->    
					->backoffCounter == 0 
						-> "  backoff"
					->backoffCounter > 0 -> "     ,      "
						->"send RDY 0 immediately (to *all* connections)"
							->r.updateRDY(c, 0)
							->"      "
						->backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter))
						->" backoffDuration     "
						->time.AfterFunc(d, r.resume)
						->r.updateRDY(choice, 1)
		->cmd := Subscribe(r.topic, r.channel)
			->&Command{[]byte("SUB"), params, nil}
		->for _, c := range r.conns() {}
			->r.maybeUpdateRDY(c)
				->count := r.perConnMaxInFlight()
				->r.updateRDY(conn, count)
					->c.maxRdyCount = resp.maxRdyCount 
						->"client nsqd          N   "
						->"     "
					->c.rdyCount
						->"     "
					->maxPossibleRdy := int64(r.getMaxInFlight()) - atomic.LoadInt64(&r.totalRdyCount) + rdyCount
						->"      ?"
						->"   atomic.LoadInt64(&r.totalRdyCount) + (maxPossibleRdy - rdyCount) <= getMaxInFlight "
					->r.sendRDY(c, count)
						->atomic.AddInt64(&r.totalRdyCount, count-c.RDY())
							->""
						->c.SetRDY(count)
						->err := c.WriteCommand(Ready(int(count)))
						->&Command{[]byte("RDY"), params, nil}
->err := consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
	->"    addr"
	->go r.lookupdLoop()
	->r.queryLookupd()
		->"make an HTTP req to one of the configured nsqlookupd instances to discover"
		->"which nsqd's provide the topic we are consuming"
		->"http://...//topic"
		->err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
		->broadcastAddress := producer.BroadcastAddress
		->port := producer.TCPPort
		->joined := "broadcastAddress:port"
"nsqlookupd"
->for{}->clientConn, err := listener.Accept()
->go handler.Handle(clientConn)
->func (p *LookupProtocolV1) IOLoop()
	->response, err = p.Exec(client, reader, params)
		->"PING"
			->"    nsqd          "
			->"       ,            "
			->InactiveProducerTimeout: 300 * time.Second, "  300s"
			->"    ,  nsqd  ,nsqd    UNREGISTER ,300s  nsqlookupd      "
			->Handle("GET", "/lookup")
		->"IDENTIFY"
			->peerInfo.RemoteAddress = client.RemoteAddr().String()
			->client.peerInfo = &peerInfo
			->p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo})
			->response = tcp_port ... 
		->"REGISTER"
			->"client must IDENTIFY"
			->topic, channel = params[0],params[1]
			->if channel != ""
				->key := Registration{"channel", topic, channel}
				->p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
			->key := Registration{"topic", topic, ""}
				->p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
			->"       ,"
		->"UNREGISTER"
			->topic, channel, err := getTopicChan("UNREGISTER", params)
			->"         ,nsqlookupd     ,         ?"
			->"        ? nsqd   ,nsqd      unregister cmd"
				->   nsqd   
				->func (c *Channel) exit()
				->func (t * topic) exit()
->router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
->router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
	->topicName, err := reqParams.Get("topic")
	->key := Registration{"topic", topicName, ""}
	->s.ctx.nsqlookupd.DB.AddRegistration(key)
->router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
	->registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
	->"   topic     channel"
	->"   topic"
->router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
	->key := Registration{"channel", topicName, channelName}
	->s.ctx.nsqlookupd.DB.AddRegistration(key)
	->key = Registration{"topic", topicName, ""}
	->s.ctx.nsqlookupd.DB.AddRegistration(key)