golang balance threads
type job struct {
conn net.Conn
opcode int
data []byte
result chan ResultType //結果を他のchannelに渡すため
}
type jobPair struct {
key string
value *job
}
type worker struct {
jobqueue map[string]*job // key:UserName
broadcast chan DataType
jobadd chan *jobPair
jobdel chan string
pending safepending
index int
done chan struct{}
}
func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
return &worker{
jobqueue: make(map[string]*job, queue_limit),
broadcast: make(chan DataType, source_limit),
jobadd: make(chan jobPair, jobreq_limit),
jobdel: make(chan string, jobreq_limit),
pending: safepending{0, sync.RWMutex{}},
index: idx,
done: make(chan struct{}),
}
}
func (w *worker) PushJob(user string, job *job) {
pair := jobPair{
key: user,
value: job,
}
w.jobadd <- pair
}
func (w *worker) RemoveJob(user string) {
w.jobdel <- user
}
func (w *worker) Run(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
log.Println("new goroutine, worker index:", w.index)
defer wg.Done()
ticker := time.NewTicker(time.Second * 60)
for {
select {
case data := <-w.broadcast:
for _, job := range w.jobqueue {
log.Println(job, data)
}
case jobpair := <-w.jobadd:
w.insertJob(jobpair.key, jobpair.value)
case delkey := <-w.jobdel:
w.deleteJob(delkey)
case <-ticker.C:
w.loadInfo()
case <-w.done:
log.Println("worker", w.index, "exit")
break
}
}
}()
}
func (w *worker) Stop() {
go func() {
w.done <- struct{}{}
}()
}
func (w *worker) insertJob(key string, value *job) error {
w.jobqueue[key] = value
w.pending.Inc()
return nil
}
func (w *worker) deleteJob(key string) {
delete(w.jobqueue, key)
w.pending.Dec()
}
작성자: https://www.jianshu.com/p/215510810c59
Reference
이 문제에 관하여(golang balance threads), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/yumin/items/148ec4d2a34f6782391e텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)