tunny 소스 코드 읽기

5532 단어

스레드 풀--tunny


코드:https://github.com/Jeffail/tunny
    
tunny-master
├── LICENSE
├── README.md 
├── go.mod
├── tunny.go
├── tunny_logo.png
├── tunny_test.go
└── worker.go

주요 코드 파일은 두 가지가 있는데 tunny.go 스레드 탱크 관련, worker.go 소비자 관련 논리, 먼저 tunny.go의 유형 대상을 본다.
type Pool struct {
    queuedJobs int64  //           

    ctor    func() Worker //           ,      ,    
    workers []*workerWrapper //             ,       worker  
    reqChan chan workRequest //           ,      worker      channel 

    workerMut sync.Mutex
}

다시 보기worker.go 방법은 주로 두 개의 대상이 있다
type workRequest struct {
    // jobChan is used to send the payload to this worker.
    jobChan chan

다시 보기worker.go의 다른 방법:
func newWorkerWrapper(reqChan chan

위쪽은 생성 workerWrapper 방법, 입력 매개 변수
1) reqChan: pool reqChan와 동일하며 현재 workerWrapper가 비어 있는지 여부를 제어하는 데 사용됩니다.
2)worker는 구체적으로 실행해야 하는 방법에 대응하는 인터페이스이고 worker의 인터페이스 방법process은 사용자에게 실현된다.
그 중에서 이 방법은 run 방법을 실행할 것이다. 이 방법은 비교적 세부적이고 구체적인 코드이다.
func (w *workerWrapper) run() {
    jobChan, retChan := make(chan interface{}), make(chan interface{})
    defer func() {
        w.worker.Terminate()
        close(retChan)
        close(w.closedChan)
    }()

    for {
        // NOTE: Blocking here will prevent the worker from closing down.
        w.worker.BlockUntilReady()
        select {
        case w.reqChan 

먼저 두 개jobChan retChan를 만들고jobChan는 실행해야 할 매개 변수를 저장하는 데 사용하고retChan는 함수 실행 매개 변수를 저장한 후의 결과를 저장한다.그리고 defer 함수가 있습니다. 현재 worker를 닫고 pool stop 이 함수worker를 닫으면 종료합니다.그리고 하나for는 그 안의 논리를 계속 집행하고,
        case w.reqChan 

그리고 reqChanworkRequest를 삽입해서 이것case에 있는 논리로 뛰어갑니다.
 select {
            case payload := 

이 덩어리는 대기 jobChan 안에 물건을 넣는 것을 막는다. (이 덩어리는 pool 안의 논리) 그리고 payload := , :
 result := w.worker.Process(payload)
          select {
          case retChan 
worker 처리 파라미터를 사용하여 결과를 retChan에 넣으면 pool 쪽에서 결과를 얻기 위해 기다리는 것을 막고 사용자에게 되돌려줍니다.
run 방법은 사실 pool와 큰 관계가 있기 때문에 이어서 볼 수 있는 방법pool
func (p *Pool) Process(payload interface{}) interface{} {
    atomic.AddInt64(&p.queuedJobs, 1)
  /*  chan      workerRequest      worker run   
        case w.reqChan 

위의 이 논리는 기본적으로 스레드 탱크 사이의 데이터 흐름의 핵심이다. 이외에 다른 기능도 있다. 예를 들어 설정pool의 크기이다. 주의해야 한다. 현재의 프로세스 수량이 설정의 크기보다 적으면worker를 증가해야 한다. 그렇지 않으면 감소해야 한다. 이 코드는 다음과 같다.
func (p *Pool) SetSize(n int) {
    p.workerMut.Lock() //   
    defer p.workerMut.Unlock()

    lWorkers := len(p.workers) //      worker  
    if lWorkers == n {
        return
    }

    // Add extra workers if N > len(workers)
    for i := lWorkers; i < n; i++ {
        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    } 

  //     for           worker,     stop join,       ,             
    // Asynchronously stop all workers > N
    for i := n; i < lWorkers; i++ {
        p.workers[i].stop()
    }

    // Synchronously wait for all workers > N to stop
    for i := n; i < lWorkers; i++ {
        p.workers[i].join()
    }

    // Remove stopped workers from slice
    p.workers = p.workers[:n]
}

아래 분석stop join 방법은 결합run 방법과 함께 보아야 한다
/*
  stop,    closeChan
   run        :
        case 

이외에 원본 코드에서 인터페이스를 사용하여worker를 정의하고closureWorker를 실현하였다
type Worker interface {
    // Process will synchronously perform a job and return the result.
    Process(interface{}) interface{}

    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()

    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()

    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()
}

//------------------------------------------------------------------------------

// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
    processor func(interface{}) interface{} // processor                       
}

func (w *closureWorker) Process(payload interface{}) interface{} {
    return w.processor(payload) //      ,                
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

//---------------

배울 점:
  • 스레드 탱크와worker 간의 상호작용은 같은reqChan을 사용하고 양쪽이 각각 막혀서 다음 단계가 끝날 때까지 기다린다.
  • worker의 닫기는 두 개의chan으로 제어
  • ctor를 모든worker에게 건네주고interface
  • 를 사용합니다.
  • 인터페이스 프로그래밍, 설명worker
  • 환영합니다: 위챗 연락처 13161411563

    참고 자료


    [1] https://www.shangmayuan.com/a/839f766980484fd88facf32c.html

    좋은 웹페이지 즐겨찾기