Cording Parallel gem(Thread)

9502 단어 codereadingRuby
SES의 현장에서 마음에 드는 젬의 학습회를 읽기 위해 Parallel을 읽었다.
Parallel은 루비 환경에서 다중 스레드와 다중 프로세스를 쉽게 수행할 수 있는 프로그램 라이브러리입니다.
Codriding Parallel gem(Process)도 투고했습니다.

대상

  • Parallel
  • https://github.com/grosser/parallel
  • v1.9.0
  • MRI 2.3
  • MacOS
  • 용례


    코드를 읽을 때, 나는 다음 코드의 실행 상황을 추적하기로 결정했다.
    list = ['a','b','c']
    log.info 'start'
    Parallel.map(list, in_threads: 2) do |one_letter|
      log.info one_letter
    end
    log.info 'finish'
    
    실행 결과는 다음과 같다.
  • pidProcess.pid
  • tidThread.current.object_id

  • 사실 나도 읽었다in_processes. 그러나 행동은 이해했다고 생각했지만 문장으로 정리하지 못했다.
    계속 쓰면 더 자세히 읽고 검증하지 않으면 글을 쓸 수 없다.

    Parallel.map


    우선.map.
  • Parallel.map
  • OS 및 Ruby 설치부터 병행 방법 결정


    Ruby의 실현을 고려하여 다음과 같은 사항을 결정하였다.
  • thread 또는process 기반 방법
  • 병렬
  • parallel.rb#L214
    if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
      method = :in_threads
      size = options[method] || processor_count
    elsif options[:in_threads]
      method = :in_threads
      size = options[method]
    else
      method = :in_processes
      if Process.respond_to?(:fork)
        size = options[method] || processor_count
      else
        warn "Process.fork is not supported by this Ruby"
        size = 0
      end
    end
    
    JRuby에서 지정하지 않은 상태에서 스레드를 사용합니다.
    Java는 루비와 달리 CPU 코어를 동시에 사용할 수 있어 Thread만으로도 충분하다.in_processes는 OS에서 CPU 코어 수를 가져오는 함수입니다.
    루비의 구현이 프로세스 포크를 지원하지 않으면 경고가 표시됩니다.

    처리할 원본에서 병렬 다시 계산하기


    parallel.rb#L230
    job_factory = JobFactory.new(source, options[:mutex])
    size = [job_factory.size, size].min
    
  • processor_count의 생성
  • 각 배열의 요소를 처리하는 대기열 또는 Enumerable와 유사한 위젯입니다.
  • 작업 수에서 병렬 수 다시 설정JobFactory
  • 작업이 적으면 다선정 생성도 의미가 없다
  • Parallel.work_in_threads


    안에 있는 것 좀 보고 올게요.

    각 Thread의 실행 결과 문서화

    results = []
    results_mutex = Mutex.new
    
    #...
    
    in_threads() do
      result = something
      results_mutex.synchronize { results[index] = result }
    end  
    
    엠비와 다른 경우GVL는 아님(GIL)이 스레드 안전이 아니기 때문이다.
    따라서 Mutex를 사용하여 배타 제어를 할 수 있다.

    JobFactory가 비어 있을 때까지 각 스레드에서 실행

    in_threads(options) do |worker_num|
      self.worker_number = worker_num
      # as long as there are more jobs, work on one of them
      while !exception && set = job_factory.next
        begin
          item, index = set
          result = with_instrumentation item, index, options do
            call_with_index(item, index, options, &block)
          end
          results_mutex.synchronize { results[index] = result }
        rescue StandardError => e
          exception = e
        end
      end
    end
    

  • 생성된 Thread에서 실행된 블록size
  • .work_in_threadsjob가 끝날 때까지 순환
  • in_threads가 Rescue에 의해 기록되었을 때 예외를 기록하고 순환을 종료
  • job_factory.nextoptions의StandardErrorwith_instrumentation
  • start 기본적으로 전달을 수행하는 블록
  • processor_count

    finish는 다양한 환경에서 CPU 수를 얻을 수 있는 함수입니다.
    손 옆에 있는 맥OS(Darwin)로 확인한 결과 자신의 환경에서 4개의 핵심 CPU의 결함이 발견돼 왠지 8개의 핵심으로 취급됐다.
    아마도 Intel CPU슈퍼 기술의 혜택일 것입니다.
    $ /usr/sbin/sysctl -n hw.ncpu
    8
    

    좋은 웹페이지 즐겨찾기