golang balance threads

12406 단어 Gobalance
type job struct {
    conn     net.Conn
    opcode   int
    data     []byte
    result   chan ResultType //結果を他のchannelに渡すため
}
type jobPair struct {
    key   string
    value *job
}


type worker struct {
    jobqueue  map[string]*job // key:UserName
    broadcast chan DataType
    jobadd    chan *jobPair
    jobdel    chan string
    pending   safepending
    index     int
    done      chan struct{}
}

func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
    return &worker{
        jobqueue:  make(map[string]*job, queue_limit),
        broadcast: make(chan DataType, source_limit), 
        jobadd:    make(chan jobPair, jobreq_limit),
        jobdel:    make(chan string, jobreq_limit),
        pending:   safepending{0, sync.RWMutex{}},
        index:     idx,
        done:      make(chan struct{}),
    }
}

func (w *worker) PushJob(user string, job *job) {
    pair := jobPair{
        key:   user,
        value: job,
    }
    w.jobadd <- pair
}

func (w *worker) RemoveJob(user string) {
    w.jobdel <- user
}

func (w *worker) Run(wg *sync.WaitGroup) {
    wg.Add(1)
    go func() {
        log.Println("new goroutine, worker index:", w.index)
        defer wg.Done()
        ticker := time.NewTicker(time.Second * 60)
        for {
            select {
            case data := <-w.broadcast:
                for _, job := range w.jobqueue {
                    log.Println(job, data)
                }
            case jobpair := <-w.jobadd:
                w.insertJob(jobpair.key, jobpair.value)
            case delkey := <-w.jobdel:
                w.deleteJob(delkey)
            case <-ticker.C:
                w.loadInfo()
            case <-w.done:
                log.Println("worker", w.index, "exit")
                break
            }
        }
    }()
}

func (w *worker) Stop() {
    go func() {
        w.done <- struct{}{}
    }()
}
func (w *worker) insertJob(key string, value *job) error {
    w.jobqueue[key] = value
    w.pending.Inc()
    return nil
}

func (w *worker) deleteJob(key string) {
    delete(w.jobqueue, key)
    w.pending.Dec()
}
작성자: https://www.jianshu.com/p/215510810c59

좋은 웹페이지 즐겨찾기