Rust에서 작업자 풀을 구현하는 방법

원래 내 웹사이트에 게시됨: https://kerkour.com/rust-worker-pool

두 번 생각. 작업자 풀은 소유권 모델로 인해 Rust에 적합하지 않습니다. 대신 함수형 프로그래밍과 불변 데이터를 수용하십시오. Rust는 사용하기 더 간단하고 우아한 도구인 병렬 반복자와 스트림을 제공합니다.

모든 프로그래밍 언어에서 작업자 풀로 작업할 때와 마찬가지로 동시성에 대한 상한을 항상 설정해야 한다는 점에 유의해야 합니다. 그렇지 않으면 시스템 리소스가 빠르게 고갈될 수 있습니다.

을 위한:
  • Compute intensive jobs
  • I/O intensive jobs

  • 컴퓨팅 집약적인 작업



    컴퓨팅 집약적인 작업(CPU 바운드)의 경우 rayon 을 제공하는 parallel Iterators 크레이트가 있습니다. 연결자가 스레드 풀로 발송되는 반복자입니다. 좋은 점은 스레드 풀이 개발자인 우리에게 숨겨져 있다는 것입니다. 표준 Iterator를 사용하는 것처럼 코딩하면 됩니다.

    Cargo.toml

    [package]
    name = "rust_worker_pool"
    version = "0.1.0"
    authors = ["Sylvain Kerkour <[email protected]>"]
    edition = "2018"
    
    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
    
    [dependencies]
    rand = "0.8"
    
    rayon = "1"
    


    main.rs

    use rand::{thread_rng, Rng};
    use rayon::prelude::*;
    use std::time::Duration;
    
    fn compute_job(job: i64) -> i64 {
        let mut rng = thread_rng();
        let sleep_ms: u64 = rng.gen_range(0..10);
        std::thread::sleep(Duration::from_millis(sleep_ms));
    
        job * job
    }
    
    fn process_result(result: i64) {
        println!("{}", result);
    }
    
    fn main() {
        let jobs = 0..100;
    
        jobs.into_par_iter()
            .map(compute_job)
            .for_each(process_result);
    }
    


    기본적으로 스레드 풀의 크기는 시스템의 논리적 CPU 수와 같습니다.

    I/O 집중 작업(비동기)



    I/O(Input/Output) 바인딩 작업의 경우 async 랜드로 이동해야 합니다. 보다 정확하게는 항목을 동시에 처리할 수 있는 Streams Iterator인 async 을 사용합니다.

    그러나 Stream 특성은 결합자 자체를 제공하지 않습니다. StreamExt 크레이트에서 futures 특성을 가져와야 합니다.

    Cargo.toml

    [package]
    name = "rust_worker_pool"
    version = "0.1.0"
    authors = ["Sylvain Kerkour <[email protected]>"]
    edition = "2021"
    
    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
    
    [dependencies]
    rand = "0.8"
    
    tokio = { version = "1", features = ["full"] }
    futures = "0.3"
    


    for_each_동시



    for_each_concurrent 은 Stream을 소비하므로 가장 사용하기 쉽습니다. 이는 Stream 자체를 반환하지 않고 .await ed일 수 있는 Future를 반환한다는 것을 의미합니다.

    main.rs

    use futures::{stream, StreamExt};
    use rand::{thread_rng, Rng};
    use std::time::Duration;
    
    async fn compute_job(job: i64) -> i64 {
        let mut rng = thread_rng();
        let sleep_ms: u64 = rng.gen_range(0..10);
        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
    
        job * job
    }
    
    async fn process_result(result: i64) {
        println!("{}", result);
    }
    
    #[tokio::main]
    async fn main() {
        let jobs = 0..100;
        let concurrency = 42;
    
        stream::iter(jobs)
            .for_each_concurrent(concurrency, |job| async move {
                let result = compute_job(job).await;
                process_result(result).await;
            })
            .await;
    }
    


    buffer_unordered



    반면 buffer_unordered 은 스트림을 소비하지 않습니다. 이것이 스트림을 소비하기 위해 for_each 을 싱크로 사용해야 하는 이유입니다.

    main.rs

    use futures::{stream, StreamExt};
    use rand::{thread_rng, Rng};
    use std::time::Duration;
    
    async fn compute_job(job: i64) -> i64 {
        let mut rng = thread_rng();
        let sleep_ms: u64 = rng.gen_range(0..10);
        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
    
        job * job
    }
    
    async fn process_result(result: i64) {
        println!("{}", result);
    }
    
    #[tokio::main]
    async fn main() {
        let jobs = 0..100;
        let concurrency = 42;
    
        stream::iter(jobs)
            .map(compute_job)
            .buffer_unordered(concurrency)
            .for_each(process_result)
            .await;
    }
    


    결과 수집



    때로는 결과를 직접 처리하는 대신 예를 들어 나중에 일괄적으로 보내기 위해 결과를 수집해야 할 수도 있습니다. 좋은 소식입니다. collect 메서드는 Streams에서 사용할 수 있습니다.

    main.rs

    use futures::{stream, StreamExt};
    use rand::{thread_rng, Rng};
    use std::time::Duration;
    
    async fn compute_job(job: i64) -> i64 {
        let mut rng = thread_rng();
        let sleep_ms: u64 = rng.gen_range(0..10);
        tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
    
        job * job
    }
    
    async fn process_result(result: i64) {
        println!("{}", result);
    }
    
    #[tokio::main]
    async fn main() {
        let jobs = 0..100;
        let concurrency = 42;
    
        let results: Vec<i64> = stream::iter(jobs)
            .map(compute_job)
            .buffer_unordered(concurrency)
            .collect()
            .await;
    }
    


    코드는 GitHub에 있습니다.



    평소와 같이 GitHub에서 코드를 찾을 수 있습니다: github.com/skerkour/kerkour.com (저장소에 별표를 표시하는 것을 잊지 마세요 🙏).

    좋은 웹페이지 즐겨찾기