단순 스레드 탱크 원리와 코드

8656 단어
스레드 탱크는 일정한 수량의 스레드를 미리 만들고 비동기적인 작업이 필요할 때 임무를 대기열에 넣기만 하면 스레드 탱크는 자동으로 대기열에서 임무를 찾으며 한 임무를 수행할 때마다 다음 임무를 자동으로 가져옵니다
본고는 간단한 스레드 탱크를 제공하기 때문에 스레드의 자동 증감 기능을 제공하지 않고 비교적 간단한 코드로 그 원리를 이해한다
코드는 파일이 하나밖에 없는데 주석을 포함해서 겨우 200줄밖에 안 되는데, 코드가 비교적 길기 때문에 모두 여기에 붙이지 못한다.
스레드 풀 코드는 Github【클릭】 참조
코드가 c++11을 사용했기 때문에 다음과 같은 몇 가지를 복습해야 합니다.
std::thread
std::mutex
std::condition_variable
std::move
std::lock_guard
std::unique_lock
lambda 표현식코드 설명은 다음과 같습니다.
입구부터 말하자면 구조 함수
template <unsigned _TCount>
FixedThreadPool<_TCount>::FixedThreadPool()
: m_jobsleft(0), m_isDone(false), m_isFinished(false) {
    for (int i = 0; i < _TCount; ++i) {
        m_threads[i] = std::move(std::thread([this, i]() {
            this->DoTask();
        }));
    }
}

구조 함수에서 템플릿 매개 변수에 따라Tcount에서 일정 수량의 루트를 만들고 모든 루트를 그룹 (std::array) 에 저장합니다.
그리고 모든 스레드가 DoTask 방법을 실행하는 것을 알게 될 것입니다. 참고: DoTask는 하위 스레드에서 실행됩니다.
template <unsigned _TCount>
void FixedThreadPool<_TCount>::DoTask() {
    // Run in subthreads.
    // Take the next job in the queue and run it. Notify the main thread that a job has completed.
    while (!m_isDone) {
        this->NextJob()();
        -- m_jobsleft;
        // Notify the main thread that a job has completed.
        m_conditionWait.notify_one();
    }
}

그런 번거로운 표기 변수를 보지 않고 먼저 큰 측면에서 그 원리를 이해한다.
순환 중 매번 작업 (대기열에서 찾을 것 같고, 대기열이 비어 있으면 Block) 을 실행하고, 작업 (즉 lambda 표현식 실행) 을 실행하면,jobsleft가 줄어들고, 주 라인에 "내가 또 한 작업을 수행했다"라고 알립니다.
NextJob은 미션을 어떻게 받습니까?m_condition Wait 누가 막고 있습니까?NextJob에서 작업을 가져오는 방법에 대해 살펴보겠습니다.
template <unsigned _TCount>
typename FixedThreadPool<_TCount>::JobHandler FixedThreadPool<_TCount>::NextJob() {
    // Run in subthreads.
    // Get the next job; pop the first item in the queue, otherwise wait for a signal from the main thread.
    JobHandler handler;
    std::unique_lock<std::mutex> qlock(m_mutexQueue);
    
    // Wait for a job if we don't have any.
    m_conditionJob.wait(qlock, [this]()->bool {
        return m_queue.size() || m_isDone;
    });
    
    // Get job from the queue
    if (!m_isDone) {
        handler = m_queue.front();
        m_queue.pop_front();
    }
    else { // If we're bailing out, 'inject' a job into the queue to keep jobsleft accurate.
        handler = []{};
        ++m_jobsleft;
    }
    return handler;
}

주의: 이 함수도 하위 라인에서 실행됩니다
std::conditionvariable입니다. 간단하게 mconditionJob.wait는 대기열이 비어 있는지 여부를 판단하는 것입니다.
만약 대열이 비어 있다면 막히고 언제까지 기다릴까요?(새로운 작업이 있을 때 notify one () 알림이 있을 것으로 예상됩니다. 알림이 충족되면 계속 아래로 실행됩니다.
대기열에서 작업을 꺼내서 돌아오는 것을 볼 수 있습니다.
여기 관심사 하나 있어요. 언제 mconditionJob의 notifyxxx()?
여기서:
template <unsigned _TCount>
void FixedThreadPool<_TCount>::AddJob(JobHandler job) {
    // Add a new job to the pool. If there are no jobs in the queue, a thread is woken up to take the job. If all threads are busy, the job is added to the end of the queue.
    std::lock_guard<std::mutex> guard(m_mutexQueue);
    m_queue.emplace_back(job);
    ++ m_jobsleft;
    m_conditionJob.notify_one();
}

주의: 이것은 주 라인에서 사용자가 호출하는 방법입니다
물론 JoinAll에도 있지만, 이것은 스레드 탱크의 운행 절차를 이해하는 데 큰 관계가 없다.다음은 또 다른 문제를 토론할 때 보자.
지금 네 머릿속에 라인이 있는지 없는지의 운행 절차가 있다.
마스터 스레드:[하위 스레드 생성] -> [AddJob]
하위 스레드: [DoTask] -> [NextJob] -> [NextJob]...->【NextJob】
설명: 하위 스레드는 DoTask에서 [NextJob]을 통해 작업을 순환합니다. 작업이 없을 때 Block은 NextJob에서 주 스레드의 [AddJob]가 호출되기를 기다린 후 차단된 NextJob을 웨이크업하고 NextJob을 대기열에 있는 작업을 다시 실행하여 DoTask에 맡깁니다.DoTask 가 완료되면 모든 작업이 완료되었는지 여부를 판단하는 데 사용할 수 있는 작업을 완료했다고 알려줍니다.
 
여기서는 좀 더 간단하다. 다음은 탈퇴를 고려한다.
퇴출의 문제는 막힐 수 있는 모든 하위 라인을 깨우고 순조롭게 폐기하는 것이다.
먼저 분석 함수를 봅니다.
template <unsigned _TCount>
FixedThreadPool<_TCount>::~FixedThreadPool() {
    this->JoinAll();
}

JoinAll, thread의 Join처럼 들리잖아. 봐봐.
template <unsigned _TCount>
void FixedThreadPool<_TCount>::JoinAll(bool wait) {
    if (m_isFinished) {
        return;
    }
    
    if (wait) {
        this->WaitAll();
    }
    
    // note that we're done, and wake up any thread that's
    // waiting for a new job
    m_isDone = true;
    m_conditionJob.notify_all();
    
    for(auto &x : m_threads) {
        if(x.joinable()) {
            x.join();
        }
    }
    m_isFinished = true;
}

참고: JoinAll은 주 스레드에서 수행됩니다.
오, misFinished는 JoinAll이 한 번만 실행할 수 있도록 보장합니다.
wait야, Wait All은 이름이 모든 임무가 끝날 때까지 기다리는 것 같아. 그리고 Wait All을 호출하는 라인을 막아야 해. 그렇지 않으면 어떻게 Wait라고 불러!
다음은 misDone=true, 그리고 모든 (notify all () 에 m 알림conditionJob.wait, 그것은 모든 라인에 알리는 mconditionJob.wait야, 일단 몰라. 계속 내려봐.
다음은 모든 하위 라인을 훑어보고 모든join을 떨어뜨리는 것입니다. 이것은 주 라인을 막을 것입니다!메인 라인은 모든join의 하위 라인이 실행되기를 기다려야 메인 라인으로 돌아갈 수 있습니다. 그러나 모든 작업이 완료되면join의 하위 라인은 over가 되잖아요.
 
    // Wait for a job if we don't have any.
    m_conditionJob.wait(qlock, [this]()->bool {
        return m_queue.size() || m_isDone;
    });

NextJob 메서드는 하위 스레드에서 실행됩니다.
JoinAll에서 notifyall 시, 이곳은 깨어납니다, m 때문에isDone은 true입니다. 대기열이 비어 있든 없든 계속 실행됩니다.서브라인이 퇴출되어야 한다면 막힐 수 없기 때문에 여기는 서브라인을 깨우고 서브라인이 순조롭게 퇴출되도록 하는 데 쓰인다.
 
    // Get job from the queue
    if (!m_isDone) {
        handler = m_queue.front();
        m_queue.pop_front();
    }
    else { // If we're bailing out, 'inject' a job into the queue to keep jobsleft accurate.
        handler = []{};
        ++m_jobsleft;
    }

그래서 다음 문장 블록에 도착해서 빈handler로 돌아갑니다.모두 물러나야 하니 일치된 처리를 위해 빈 것으로 돌아가는 것도 나무랄 데가 없다.
 
WaitAll이 어떤 귀신인지 살펴보자.
template <unsigned _TCount>
void FixedThreadPool<_TCount>::WaitAll() {
    // Waits until all jobs have finshed executing.
    if (m_jobsleft > 0) {
        std::unique_lock<std::mutex> lock(m_mutexWait);
        m_conditionWait.wait(lock, [this]()->bool {
            return this->m_jobsleft == 0;
        });
        lock.unlock();
    }
}

오, 그렇군요. Waitall은 역시 당신을 막고 남은 퀘스트 수가 0이 될 때까지 기다려야 깨어납니다(DoTask의 notify one과 결합).
 
그러고 보니 JoinAll에서
만약wait=true라면 모든 작업이 자연스럽게 실행된 후에join의 모든 라인이 종료되기를 기다립니다.
만약wait=false라면, 대기 작업에 막힌 모든 라인이 빈 작업을 직접 수행하고 종료합니다.임무를 수행하고 있는 라인을 임무를 수행한 후 종료시킵니다.
여기까지 알겠어?
코드를 잘 보고 모르는 곳에 부딪히면 영감을 찾아보자.
 
 
 
 
 
 
 

좋은 웹페이지 즐겨찾기