OpenResty 협정 API 돌리 기

8282 단어 coroutineopenresty
주의: 본 논문 에 열 거 된 모든 코드 는 Proof Of Concept 일 뿐 오류 처 리 를 하지 않 았 습 니 다.또 일부 한계 상황 에 대해 서 는 제대로 고려 하지 않 았 을 수도 있다.따라서 글 의 코드 를 프로젝트 에 직접 복사 하여 초래 한 모든 결과 에 대해 책임 을 져 야 합 니 다.
OK, 본론 으로 들 어가 자.OpenResty 는 ngx.thread.*, coroutine.*, ngx.semaphore 등 일련의 협정 API 를 제공 했다.Nginx 의 요청 처리 방식 에 제한 을 받 지만 표 현 력 은 통용 언어의 협정 API 만큼 강하 지 않다.하지만 머리 를 뚫 으 면 수작 을 부 릴 수 있다.이 API 를 통 해 다른 프로 그래 밍 플랫폼 의 스케줄 링 방식 을 모 의 해 봅 시다.
자바 의 Future 를 모 의 합 니 다.
자바 의 Future 는 작업 을 만 들 고 필요 할 때 get 작업 의 반환 값 을 만 들 수 있 습 니 다.또 퓨 처 에는 시간 초과 기능 도 있다.우 리 는 구체 적 인 임 무 를 완성 하기 위해 협 정 을 사용 할 수 있 으 며, 시간 을 초과 하여 끝 나 는 협 정 을 추가 하여 시간 을 초과 할 수 있다.
이렇게:
local function task()
    ngx.sleep(3)
    ngx.say("Done")
end

local task_thread = ngx.thread.spawn(task)
local timeout_thread = ngx.thread.spawn(function(timeout)
    ngx.sleep(timeout)
    error("timeout")
end, 2)
local ok, res = ngx.thread.wait(task_thread, timeout_thread)
if not ok then
    if res == "timeout" then
        ngx.thread.kill(task_thread)
        ngx.say("task cancelled by timeout")
        return
    end
    ngx.say("task failed, result: ", res)
end
ngx.thread.kill(timeout_thread)

주의 하 세 요. 어떤 협 정 이 물 러 난 후에 우 리 는 다른 협 정 을 kill 해 야 합 니 다.호출 ngx.exit 같은 방법 으로 명시 적 으로 물 러 나 지 않 으 면 모든 협 정 이 물 러 날 때 까지 현재 단계 가 끝나 지 않 기 때문이다.
문서 의 내용 참조:
By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate until
  • both the "entry thread" and all the user "light threads" terminates,
  • a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), or
  • the "entry thread" terminates with a Lua error.

  • Javascript 의 Promise. race / all 을 모 의 합 니 다.
    Promise. race / all 은 여러 개의 Promise 를 받 은 다음 새 Promise 로 포장 하여 되 돌려 줍 니 다. 관련 문 서 를 참조 하 십시오:
    The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.
    The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.
    여기 서 reject 는 협정 실행 중 error 를 던 지 는 것 과 같 습 니 다. resolve 는 협정 에 비해 결 과 를 되 돌려 줍 니 다. 이 두 API 는 reject 의 처리 가 일치 합 니 다. 모든 오류 가 발생 하면 즉시 이상 결 과 를 되 돌려 줍 니 다. 정상 적 인 결과 에 대해 서 는 race 는 첫 번 째 결과 가 나 올 때 되 돌아 오고 all 은 모든 결과 가 나 온 후에 돌아 갑 니 다. 주의해 야 할 것 은 Javascript 입 니 다.원생 의 Promise 는 일시 적 으로 cancell 기능 이 없습니다. 따라서 그 중 하나 인 Promise reject 가 되 더 라 도 다른 Promise 는 계속 실 행 될 것 입 니 다. 이에 따라 저희 도 이 사 를 왔 습 니 다.
    Promise. race 의 실현:
    local function apple()
        ngx.sleep(0.1)
        --error("apple lost")
        return "apple done"
    end
    
    local function banana()
        ngx.sleep(0.2)
        return "banana done"
    end
    
    local function carrot()
        ngx.sleep(0.3)
        return "carrot done"
    end
    
    local function race(...)
        local functions = {...}
        local threads = {}
        for _, f in ipairs(functions) do
            local th, err = ngx.thread.spawn(f)
            if not th then
                -- Promise.race      cancell   ,
                --       ,          
                return nil, err
            end
            table.insert(threads, th)
        end
        local ok, res = ngx.thread.wait(unpack(threads))
        if not ok then
            return nil, res
        end
        return res
    end
    
    local res, err = race(apple, banana, carrot)
    ngx.say("res: ", res, " err: ", err)
    ngx.exit(ngx.OK)

    Promise. all 의 실현:
    local function all(...)
        local functions = {...}
        local threads = {}
        for _, f in ipairs(functions) do
            local th, err = ngx.thread.spawn(f)
            if not th then
                return nil, err
            end
            table.insert(threads, th)
        end
        local res_group = {}
        for _ = 1, #threads do
            local ok, res = ngx.thread.wait(unpack(threads))
            if not ok then
                return nil, res
            end
            table.insert(res_group, res)
        end
        return res_group
    end

    Go 안의 channel 을 모 의 합 니 다 (부분 만 구현)
    더 나 아가 Go 안의 channel 을 모 의 해 보 세 요. 우 리 는 다음 과 같은 의 미 를 실현 해 야 합 니 다.
  • 데이터 가 소비 되 지 않 았 을 때 생산 자 는 데 이 터 를 보 낸 후에 운행 을 중단 합 니 다.
  • 데이터 가 생산 되 지 않 으 면 소비자 들 은 데 이 터 를 받 기 전에 운행 을 중단 합 니 다.
  • 소비자 가 데 이 터 를 받 기 를 기다 리 는 생산자 가 존재 할 때 다른 생산 자 는 데 이 터 를 보 내기 전에 운행 을 중단 합 니 다.
  • 이번에 쓸 거 야 ngx.semaphore.
    local semaphore = require "ngx.semaphore"
    
    local Chan = {
        new = function(self)
            local chan_attrs = {
                _read_sema = semaphore:new(),
                _write_sema = semaphore:new(),
                _exclude_sema = semaphore:new(),
                _buffer = nil,
                _waiting_thread_num = 0,
            }
            return setmetatable(chan_attrs, {__index = self})
        end,
        send = function(self, value, timeout)
            timeout = timeout or 60
            while self._buffer do
                self._waiting_thread_num = self._waiting_thread_num + 1
                self._exclude_sema:wait(timeout)
                self._waiting_thread_num = self._waiting_thread_num - 1
            end
            self._buffer = value
            self._read_sema:post()
            self._write_sema:wait(timeout)
        end,
        receive = function(self, timeout)
            timeout = timeout or 60
            self._read_sema:wait(timeout)
            local value = self._buffer
            self._buffer = nil
            self._write_sema:post()
            if self._waiting_thread_num > 0 then
                self._exclude_sema:post()
            end
            return value
        end,
    }
    
    local chan = Chan:new()
    
    --        
    local function worker_a(ch)
        for i = 1, 10 do
            ngx.sleep(math.random() / 10)
            ch:send(i, 1)
        end
    end
    
    local function worker_c(ch)
        for i = 11, 20 do
            ngx.sleep(math.random() / 10)
            ch:send(i, 1)
        end
    end
    
    local function worker_d(ch)
        for i = 21, 30 do
            ngx.sleep(math.random() / 10)
            ch:send(i, 1)
        end
    end
    
    
    local function worker_b(ch)
        for _ = 1, 20 do
            ngx.sleep(math.random() / 10)
            local v = ch:receive(1)
            ngx.say("recv ", v)
        end
    end
    
    local function worker_e(ch)
        for _ = 1, 10 do
            ngx.sleep(math.random() / 10)
            local v = ch:receive(1)
            ngx.say("recv ", v)
        end
    end
    
    ngx.thread.spawn(worker_a, chan)
    ngx.thread.spawn(worker_b, chan)
    ngx.thread.spawn(worker_c, chan)
    ngx.thread.spawn(worker_d, chan)
    ngx.thread.spawn(worker_e, chan)

    아 날로 그 버 프 레 드 채널 도 가능 합 니 다.
    local ok, new_tab = pcall(require, "table.new")
    if not ok then
        new_tab = function (_, _) return {} end
    end
    
    
    local BufferedChan = {
        new = function(self, buffer_size)
            if not buffer_size or buffer_size <= 0 then
                error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")
            end
            local chan_attrs = {
                _read_sema = semaphore:new(),
                _write_sema = semaphore:new(),
                _waiting_thread_num = 0,
                _buffer_size = buffer_size,
            }
            chan_attrs._buffer = new_tab(buffer_size, 0)
            return setmetatable(chan_attrs, {__index = self})
        end,
        send = function (self, value, timeout)
            timeout = timeout or 60
            while #self._buffer >= self._buffer_size do
                self._waiting_thread_num = self._waiting_thread_num + 1
                self._write_sema:wait(timeout)
                self._waiting_thread_num = self._waiting_thread_num - 1
            end
            table.insert(self._buffer, value)
            self._read_sema:post()
        end,
        receive = function(self, timeout)
            timeout = timeout or 60
            self._read_sema:wait(timeout)
            local value = table.remove(self._buffer)
            if self._waiting_thread_num > 0 then
                self._write_sema:post()
            end
            return value
        end,
    }
    
    local chan = BufferedChan:new(2)
    -- ...

    물론 위의 짝 퉁 은 많은 문제 가 있 습 니 다. 예 를 들 어 중요 한 select 지원 이 부족 하고 close 와 관련 된 특성 도 실현 하지 못 했 습 니 다.

    좋은 웹페이지 즐겨찾기