이 더 리 움 이벤트 메커니즘 및 최적화

7857 단어 블록 체인
1 이 태 방 의 사건 메커니즘
이 더 리 움 고-etherum 소스 코드 에서 이 벤트 를 보 내 는 것 은 일반적인 채널 을 제외 하고 포 장 된 Feed 구조 로 이벤트 의 구독 과 발송 을 수행 합 니 다.이 태 방 에 서 는 대량의 피 드 를 사용 하여 사건 을 처리 하 였 다.Feed 구독 이 벤트 를 사용 하 는 절 차 는:
  • 채널 을 정의 합 니 다 ch:ch=make(someType)
  • 하나의 Feed 대상 feed
  • 를 정의 합 니 다.
  • Feed 구독 채널 ch:feed.Subscribe(ch)
  • feed 를 사용 하여 채널 에 데 이 터 를 보 냅 니 다:feed.send(someTypeData)
  • ch 수신 데이터:ret

  • 하나의 feed 는 여러 채널 을 구독 할 수 있 습 니 다.feed 를 사용 하여 데 이 터 를 보 내 면 모든 채널 이 데 이 터 를 받 습 니 다.다음은 피 드 의 소스 코드 를 해독 하고 피 드 소스 코드 판독 에 들 어가 기 전에 고 에 있 는 reflect 패키지 의 SelectCase 를 소개 합 니 다.
    2.reflect.selectCase 를 사용 하여 여러 채널 을 감청 합 니 다.
    여러 채널 ch1,ch2,ch3 에 대해 전통 적 인 Select 방식 으로 감청 합 니 다.
    package main
     
    import (
    	"fmt"
    	"strconv"
    )
     
    func main() {
    	var chs1 = make(chan int)
    	var chs2 = make(chan float64)
    	var chs3 = make(chan string)
    	var ch4close = make(chan int)
    	defer close(ch4close)
     
    	go func(c chan int, ch4close chan int) {
    		for i := 0; i < 5; i++ {
    			c 

    reflect 방식 으로 감청 하기:
    package main
     
    import (
    	"fmt"
    	"reflect"
    	"strconv"
    )
     
    func main() {
    	var chs1 = make(chan int)
    	var chs2 = make(chan float64)
    	var chs3 = make(chan string)
    	var ch4close = make(chan int)
    	defer close(ch4close)
     
    	go func(c chan int, ch4close chan int) {
    		for i := 0; i < 5; i++ {
    			c 

    여기에 reflect.selectCase 배열 selectCase 를 구축 하여 감청 할 채널 을 배열 에 추가 합 니 다.감청 시 reflect.selectCase(selectCase)를 사용 하면 모든 채널 의 정 보 를 감청 할 수 있다.채널 수가 많 을 때 는 selectCase 를 사용 하 는 방식 이 더욱 간결 하고 우아 해 집 니 다.
    3 피 드 소스 코드 해독
    Feed 구조의 원본 코드 는 이벤트/feed.go 에 있 습 니 다.
    Feed 구조
    type Feed struct {
    	once      sync.Once        // ensures that init only runs once
    	sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases.
    	removeSub chan interface{} // interrupts Send
    	sendCases caseList         // the active set of select cases used by Send
    
    	// The inbox holds newly subscribed channels until they are added to sendCases.
    	mu     sync.Mutex
    	inbox  caseList
    	etype  reflect.Type
    	closed bool
    }
    
    type caseList []reflect.SelectCase

    Feed 구조의 핵심 은 inbox 구성원 입 니 다.이 Feed 구독 의 모든 채널 을 저장 하 는 selectCase 의 배열 입 니 다.sendcase 는 모든 활성 화 된 채널 배열 입 니 다.sendLock 채널 은 sendcase 를 보호 하기 위해 자물쇠 로 사 용 됩 니 다.
    함수 초기 화
    func (f *Feed) init() {
    	f.removeSub = make(chan interface{})
    	f.sendLock = make(chan struct{}, 1)
    	f.sendLock 

    여기 sendLock 은 용량 이 1 인 버퍼 채널 로 설정 되 어 있 습 니 다.그리고 sendLock 에 값 을 먼저 썼 습 니 다.sendCases 는 첫 번 째 채널 로 removeSub 채널 을 미리 가입 했다.
    채널 구독 함수
    
    //                        。            
    func (f *Feed) Subscribe(channel interface{}) Subscription {
    	f.once.Do(f.init)
    
    	chanval := reflect.ValueOf(channel)
    	chantyp := chanval.Type()
    	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
    		panic(errBadChannel)
    	}
    	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
    
    	f.mu.Lock()
    	defer f.mu.Unlock()
    	if !f.typecheck(chantyp.Elem()) {
    		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
    	}
    	// Add the select case to the inbox.
    	// The next Send will add it to f.sendCases.
    	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
    	f.inbox = append(f.inbox, cas)
    	return sub
    }

    이 함수 가 하 는 일 은 매우 간단 합 니 다.바로 채널 ch 에 따라 selectCase 대상 을 구성 한 다음 에 inbox 배열 에 넣 는 것 입 니 다.이렇게 해서 채널 구독 이 완료 되 었 습 니 다.
    송신 함수
    // Send delivers to all subscribed channels simultaneously.
    // It returns the number of subscribers that the value was sent to.
    func (f *Feed) Send(value interface{}) (nsent int) {
    	rvalue := reflect.ValueOf(value)
    
    	f.once.Do(f.init)//     ,onece.Do        
    	= 0 && index < len(cases) {
    				// Shrink 'cases' too because the removed case was still active.
    				cases = f.sendCases[:len(cases)-1]
    			}
    		} else {
    			cases = cases.deactivate(chosen)
    			nsent++
    		}
    	}
    
    	// Forget about the sent value and hand off the send lock.
    	for i := firstSubSendCase; i < len(f.sendCases); i++ {
    		f.sendCases[i].Send = reflect.Value{}
    	}
    	f.sendLock 

    send 함 수 는 채널 의 trySend 방법 으로 보 냅 니 다.정상 적 인 상황 에서 바로 보 낼 수 있 습 니 다.그러나 수신 채널 이 막 혔 을 때 Select 방법 이라는 막 힌 방식 으로 채널 전송 이 성공 할 때 까지 기 다 려 야 합 니 다.마지막 으로 돌아 올 때 sendLock 을 기록 하여 다음 발송 을 준비 합 니 다.
    4 send 함수 에 존재 하 는 문제점 및 최적화
    우 리 는 send 함수 가 sendLock 채널 을 사용 하 는 것 을 보 았 다.그것 은 용량 이 1 인 채널 이다.send 함수 가 시작 되면 sendLock 채널 을 읽 습 니 다.이때 sendLock 이 비어 있 으 면 send 함수 가 막 힙 니 다.그래서 send 함수 마지막 에 sendLock 채널 을 기록 하면 다음 에 sendLock 을 읽 으 러 보 낼 때 막 히 지 않 습 니 다.보기 에는 문제 가 없 는 것 같 지만 이상 은 풍만 해서 가끔 뼈 를 드 러 낼 때 가 있다.여기에 존재 하 는 문제점 은 바로 selected,recv,:=reflect.select(cases)라 는 코드 가 막 혀 서 for 순환 값 이 나 오지 않 고 send 함수 가 막 혀 서 sendLock 이 기록 되 지 않 습 니 다.자물쇠 가 잠 겼 습 니 다.다음 send 발송 은 막 힐 겁 니 다.
    여기 서 sendLock 을 사용 하 는 것 은 공공 sendcase 데 이 터 를 보호 하기 위해 서 입 니 다.해결 방향 은 sendcase 를 제거 하고 전체적인 sendcase 에 적용 되 지 않 으 며 부분 변 수 를 사용 하 는 것 입 니 다.이렇게 하면 동기 화 문 제 를 고려 할 필요 가 없다.개 조 된 send 함수:
    func (f *Feed) Send(value interface{}) (nsent int) {
    	rvalue := reflect.ValueOf(value)
    
    	f.once.Do(f.init)
    	//= 0 && index < len(cases) {
    				// Shrink 'cases' too because the removed case was still active.
    				cases = f.sendCases[:len(cases)-1]
    			}
    		} else {
    			cases = cases.deactivate(chosen)
    			nsent++
    		}
    	}
    
    	// Forget about the sent value and hand off the send lock.
    	for i := firstSubSendCase; i < len(f.sendCases); i++ {
    		f.sendCases[i].Send = reflect.Value{}
    	}
    	//f.sendLock 

    어떤 send 는 막 힐 수 있 지만 다음 send 발송 에는 영향 을 주지 않 습 니 다.
    5 go-etherum 소스 코드 에 send 를 사용 한 구덩이
     코어/blockchain.go 에서 보 내 는 함수 PostChainEvents():
    // PostChainEvents iterates over the events generated by a chain insertion and
    // posts them into the event feed.
    // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
    func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
    	log.Info("lzj-log PostChainEvents", "events len",len(events))
    	// post event logs for further processing
    	if logs != nil {
    		bc.logsFeed.Send(logs)
    	}
    	for _, event := range events {
    		switch ev := event.(type) {
    		case ChainEvent:
    			log.Info("lzj-log send ChainEvent")
    			bc.chainFeed.Send(ev)
    
    		case ChainHeadEvent:
    			log.Info("lzj-log send ChainHeadEvent")
    			bc.chainHeadFeed.Send(ev)
    
    		case ChainSideEvent:
    			log.Info("lzj-log send ChainSideEvent")
    			bc.chainSideFeed.Send(ev)
    		}
    	}
    }

    이 함 수 는 for 순환 에서 ChainEvent,ChainHeadEvent 와 ChainSideEvent 이 벤트 를 연이어 보 냈 습 니 다.insert 함수 에서 이 함 수 를 호출 하 였 습 니 다.하지만 여기 에는 문제 가 있 습 니 다.이전 사건 의 발송 이 막 히 면 뒤의 사건 발송 은 실행 되 지 않 습 니 다.Send 함 수 를 단독 협 정 에 넣 어야 합 니 다.이렇게 하면 막 히 는 문 제 를 방지 할 수 있다.
    // PostChainEvents iterates over the events generated by a chain insertion and
    // posts them into the event feed.
    // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
    func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
    	log.Info("lzj-log PostChainEvents", "events len",len(events))
    	// post event logs for further processing
    	if logs != nil {
    		bc.logsFeed.Send(logs)
    	}
    	for _, event := range events {
    		switch ev := event.(type) {
    		case ChainEvent:
    			log.Info("lzj-log send ChainEvent")
    			go bc.chainFeed.Send(ev)
    
    		case ChainHeadEvent:
    			log.Info("lzj-log send ChainHeadEvent")
    			go bc.chainHeadFeed.Send(ev)
    
    		case ChainSideEvent:
    			log.Info("lzj-log send ChainSideEvent")
    			go bc.chainSideFeed.Send(ev)
    		}
    	}
    }

    go 안에서 통 로 를 사용 할 때 매우 조심해 야 한다.왜냐하면 막 히 기 쉬 워 서 자신 이 원 하 는 결 과 를 얻 지 못 하기 때문이다.

    좋은 웹페이지 즐겨찾기