이 더 리 움 이벤트 메커니즘 및 최적화
7857 단어 블록 체인
이 더 리 움 고-etherum 소스 코드 에서 이 벤트 를 보 내 는 것 은 일반적인 채널 을 제외 하고 포 장 된 Feed 구조 로 이벤트 의 구독 과 발송 을 수행 합 니 다.이 태 방 에 서 는 대량의 피 드 를 사용 하여 사건 을 처리 하 였 다.Feed 구독 이 벤트 를 사용 하 는 절 차 는:
하나의 feed 는 여러 채널 을 구독 할 수 있 습 니 다.feed 를 사용 하여 데 이 터 를 보 내 면 모든 채널 이 데 이 터 를 받 습 니 다.다음은 피 드 의 소스 코드 를 해독 하고 피 드 소스 코드 판독 에 들 어가 기 전에 고 에 있 는 reflect 패키지 의 SelectCase 를 소개 합 니 다.
2.reflect.selectCase 를 사용 하여 여러 채널 을 감청 합 니 다.
여러 채널 ch1,ch2,ch3 에 대해 전통 적 인 Select 방식 으로 감청 합 니 다.
package main
import (
"fmt"
"strconv"
)
func main() {
var chs1 = make(chan int)
var chs2 = make(chan float64)
var chs3 = make(chan string)
var ch4close = make(chan int)
defer close(ch4close)
go func(c chan int, ch4close chan int) {
for i := 0; i < 5; i++ {
c
reflect 방식 으로 감청 하기:
package main
import (
"fmt"
"reflect"
"strconv"
)
func main() {
var chs1 = make(chan int)
var chs2 = make(chan float64)
var chs3 = make(chan string)
var ch4close = make(chan int)
defer close(ch4close)
go func(c chan int, ch4close chan int) {
for i := 0; i < 5; i++ {
c
여기에 reflect.selectCase 배열 selectCase 를 구축 하여 감청 할 채널 을 배열 에 추가 합 니 다.감청 시 reflect.selectCase(selectCase)를 사용 하면 모든 채널 의 정 보 를 감청 할 수 있다.채널 수가 많 을 때 는 selectCase 를 사용 하 는 방식 이 더욱 간결 하고 우아 해 집 니 다.
3 피 드 소스 코드 해독
Feed 구조의 원본 코드 는 이벤트/feed.go 에 있 습 니 다.
Feed 구조
type Feed struct {
once sync.Once // ensures that init only runs once
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send
// The inbox holds newly subscribed channels until they are added to sendCases.
mu sync.Mutex
inbox caseList
etype reflect.Type
closed bool
}
type caseList []reflect.SelectCase
Feed 구조의 핵심 은 inbox 구성원 입 니 다.이 Feed 구독 의 모든 채널 을 저장 하 는 selectCase 의 배열 입 니 다.sendcase 는 모든 활성 화 된 채널 배열 입 니 다.sendLock 채널 은 sendcase 를 보호 하기 위해 자물쇠 로 사 용 됩 니 다.
함수 초기 화
func (f *Feed) init() {
f.removeSub = make(chan interface{})
f.sendLock = make(chan struct{}, 1)
f.sendLock
여기 sendLock 은 용량 이 1 인 버퍼 채널 로 설정 되 어 있 습 니 다.그리고 sendLock 에 값 을 먼저 썼 습 니 다.sendCases 는 첫 번 째 채널 로 removeSub 채널 을 미리 가입 했다.
채널 구독 함수
// 。
func (f *Feed) Subscribe(channel interface{}) Subscription {
f.once.Do(f.init)
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}
이 함수 가 하 는 일 은 매우 간단 합 니 다.바로 채널 ch 에 따라 selectCase 대상 을 구성 한 다음 에 inbox 배열 에 넣 는 것 입 니 다.이렇게 해서 채널 구독 이 완료 되 었 습 니 다.
송신 함수
// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)// ,onece.Do
= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock
send 함 수 는 채널 의 trySend 방법 으로 보 냅 니 다.정상 적 인 상황 에서 바로 보 낼 수 있 습 니 다.그러나 수신 채널 이 막 혔 을 때 Select 방법 이라는 막 힌 방식 으로 채널 전송 이 성공 할 때 까지 기 다 려 야 합 니 다.마지막 으로 돌아 올 때 sendLock 을 기록 하여 다음 발송 을 준비 합 니 다.
4 send 함수 에 존재 하 는 문제점 및 최적화
우 리 는 send 함수 가 sendLock 채널 을 사용 하 는 것 을 보 았 다.그것 은 용량 이 1 인 채널 이다.send 함수 가 시작 되면 sendLock 채널 을 읽 습 니 다.이때 sendLock 이 비어 있 으 면 send 함수 가 막 힙 니 다.그래서 send 함수 마지막 에 sendLock 채널 을 기록 하면 다음 에 sendLock 을 읽 으 러 보 낼 때 막 히 지 않 습 니 다.보기 에는 문제 가 없 는 것 같 지만 이상 은 풍만 해서 가끔 뼈 를 드 러 낼 때 가 있다.여기에 존재 하 는 문제점 은 바로 selected,recv,:=reflect.select(cases)라 는 코드 가 막 혀 서 for 순환 값 이 나 오지 않 고 send 함수 가 막 혀 서 sendLock 이 기록 되 지 않 습 니 다.자물쇠 가 잠 겼 습 니 다.다음 send 발송 은 막 힐 겁 니 다.
여기 서 sendLock 을 사용 하 는 것 은 공공 sendcase 데 이 터 를 보호 하기 위해 서 입 니 다.해결 방향 은 sendcase 를 제거 하고 전체적인 sendcase 에 적용 되 지 않 으 며 부분 변 수 를 사용 하 는 것 입 니 다.이렇게 하면 동기 화 문 제 를 고려 할 필요 가 없다.개 조 된 send 함수:
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
//= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
//f.sendLock
어떤 send 는 막 힐 수 있 지만 다음 send 발송 에는 영향 을 주지 않 습 니 다.
5 go-etherum 소스 코드 에 send 를 사용 한 구덩이
코어/blockchain.go 에서 보 내 는 함수 PostChainEvents():
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
log.Info("lzj-log PostChainEvents", "events len",len(events))
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
log.Info("lzj-log send ChainEvent")
bc.chainFeed.Send(ev)
case ChainHeadEvent:
log.Info("lzj-log send ChainHeadEvent")
bc.chainHeadFeed.Send(ev)
case ChainSideEvent:
log.Info("lzj-log send ChainSideEvent")
bc.chainSideFeed.Send(ev)
}
}
}
이 함 수 는 for 순환 에서 ChainEvent,ChainHeadEvent 와 ChainSideEvent 이 벤트 를 연이어 보 냈 습 니 다.insert 함수 에서 이 함 수 를 호출 하 였 습 니 다.하지만 여기 에는 문제 가 있 습 니 다.이전 사건 의 발송 이 막 히 면 뒤의 사건 발송 은 실행 되 지 않 습 니 다.Send 함 수 를 단독 협 정 에 넣 어야 합 니 다.이렇게 하면 막 히 는 문 제 를 방지 할 수 있다.
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
log.Info("lzj-log PostChainEvents", "events len",len(events))
// post event logs for further processing
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
switch ev := event.(type) {
case ChainEvent:
log.Info("lzj-log send ChainEvent")
go bc.chainFeed.Send(ev)
case ChainHeadEvent:
log.Info("lzj-log send ChainHeadEvent")
go bc.chainHeadFeed.Send(ev)
case ChainSideEvent:
log.Info("lzj-log send ChainSideEvent")
go bc.chainSideFeed.Send(ev)
}
}
}
go 안에서 통 로 를 사용 할 때 매우 조심해 야 한다.왜냐하면 막 히 기 쉬 워 서 자신 이 원 하 는 결 과 를 얻 지 못 하기 때문이다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
ERC 20 - 분석 입력 데이터 자바 (transfer)maven 도입 문제. https://github.com/web3j/web3j/issues/489 다른 유사 성 은 당연히 abi json 을 미리 알 아야 한다....
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.