tunny 소스 코드 읽기
스레드 풀--tunny
코드:https://github.com/Jeffail/tunny
tunny-master
├── LICENSE
├── README.md
├── go.mod
├── tunny.go
├── tunny_logo.png
├── tunny_test.go
└── worker.go
주요 코드 파일은 두 가지가 있는데 tunny.go
스레드 탱크 관련, worker.go
소비자 관련 논리, 먼저 tunny.go
의 유형 대상을 본다.type Pool struct {
queuedJobs int64 //
ctor func() Worker // , ,
workers []*workerWrapper // , worker
reqChan chan workRequest // , worker channel
workerMut sync.Mutex
}
다시 보기worker.go
방법은 주로 두 개의 대상이 있다type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan
다시 보기worker.go
의 다른 방법:func newWorkerWrapper(reqChan chan
위쪽은 생성 workerWrapper
방법, 입력 매개 변수
1) reqChan
: pool reqChan
와 동일하며 현재 workerWrapper
가 비어 있는지 여부를 제어하는 데 사용됩니다.
2)worker
는 구체적으로 실행해야 하는 방법에 대응하는 인터페이스이고 worker
의 인터페이스 방법process
은 사용자에게 실현된다.
그 중에서 이 방법은 run
방법을 실행할 것이다. 이 방법은 비교적 세부적이고 구체적인 코드이다.func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan
먼저 두 개jobChan retChan
를 만들고jobChan
는 실행해야 할 매개 변수를 저장하는 데 사용하고retChan
는 함수 실행 매개 변수를 저장한 후의 결과를 저장한다.그리고 defer
함수가 있습니다. 현재 worker
를 닫고 pool stop
이 함수worker
를 닫으면 종료합니다.그리고 하나for
는 그 안의 논리를 계속 집행하고, case w.reqChan
그리고 reqChan
에 workRequest
를 삽입해서 이것case
에 있는 논리로 뛰어갑니다. select {
case payload :=
이 덩어리는 대기 jobChan
안에 물건을 넣는 것을 막는다. (이 덩어리는 pool
안의 논리) 그리고 payload := , :
result := w.worker.Process(payload)
select {
case retChan
worker
처리 파라미터를 사용하여 결과를 retChan
에 넣으면 pool
쪽에서 결과를 얻기 위해 기다리는 것을 막고 사용자에게 되돌려줍니다.
이 run
방법은 사실 pool
와 큰 관계가 있기 때문에 이어서 볼 수 있는 방법pool
func (p *Pool) Process(payload interface{}) interface{} {
atomic.AddInt64(&p.queuedJobs, 1)
/* chan workerRequest worker run
case w.reqChan
위의 이 논리는 기본적으로 스레드 탱크 사이의 데이터 흐름의 핵심이다. 이외에 다른 기능도 있다. 예를 들어 설정pool
의 크기이다. 주의해야 한다. 현재의 프로세스 수량이 설정의 크기보다 적으면worker를 증가해야 한다. 그렇지 않으면 감소해야 한다. 이 코드는 다음과 같다.func (p *Pool) SetSize(n int) {
p.workerMut.Lock() //
defer p.workerMut.Unlock()
lWorkers := len(p.workers) // worker
if lWorkers == n {
return
}
// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// for worker, stop join, ,
// Asynchronously stop all workers > N
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// Synchronously wait for all workers > N to stop
for i := n; i < lWorkers; i++ {
p.workers[i].join()
}
// Remove stopped workers from slice
p.workers = p.workers[:n]
}
아래 분석stop join
방법은 결합run
방법과 함께 보아야 한다/*
stop, closeChan
run :
case
이외에 원본 코드에서 인터페이스를 사용하여worker를 정의하고closureWorker를 실현하였다type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}
//------------------------------------------------------------------------------
// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{} // processor
}
func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload) // ,
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
//---------------
배울 점:
tunny-master
├── LICENSE
├── README.md
├── go.mod
├── tunny.go
├── tunny_logo.png
├── tunny_test.go
└── worker.go
type Pool struct {
queuedJobs int64 //
ctor func() Worker // , ,
workers []*workerWrapper // , worker
reqChan chan workRequest // , worker channel
workerMut sync.Mutex
}
type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan
func newWorkerWrapper(reqChan chan
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan
case w.reqChan
select {
case payload :=
result := w.worker.Process(payload)
select {
case retChan
func (p *Pool) Process(payload interface{}) interface{} {
atomic.AddInt64(&p.queuedJobs, 1)
/* chan workerRequest worker run
case w.reqChan
func (p *Pool) SetSize(n int) {
p.workerMut.Lock() //
defer p.workerMut.Unlock()
lWorkers := len(p.workers) // worker
if lWorkers == n {
return
}
// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// for worker, stop join, ,
// Asynchronously stop all workers > N
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// Synchronously wait for all workers > N to stop
for i := n; i < lWorkers; i++ {
p.workers[i].join()
}
// Remove stopped workers from slice
p.workers = p.workers[:n]
}
/*
stop, closeChan
run :
case
type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}
//------------------------------------------------------------------------------
// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{} // processor
}
func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload) // ,
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
//---------------
worker
참고 자료
[1] https://www.shangmayuan.com/a/839f766980484fd88facf32c.html
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.