OpenResty 협정 API 돌리 기
OK, 본론 으로 들 어가 자.OpenResty 는
ngx.thread.*
, coroutine.*
, ngx.semaphore
등 일련의 협정 API 를 제공 했다.Nginx 의 요청 처리 방식 에 제한 을 받 지만 표 현 력 은 통용 언어의 협정 API 만큼 강하 지 않다.하지만 머리 를 뚫 으 면 수작 을 부 릴 수 있다.이 API 를 통 해 다른 프로 그래 밍 플랫폼 의 스케줄 링 방식 을 모 의 해 봅 시다.자바 의 Future 를 모 의 합 니 다.
자바 의 Future 는 작업 을 만 들 고 필요 할 때 get 작업 의 반환 값 을 만 들 수 있 습 니 다.또 퓨 처 에는 시간 초과 기능 도 있다.우 리 는 구체 적 인 임 무 를 완성 하기 위해 협 정 을 사용 할 수 있 으 며, 시간 을 초과 하여 끝 나 는 협 정 을 추가 하여 시간 을 초과 할 수 있다.
이렇게:
local function task()
ngx.sleep(3)
ngx.say("Done")
end
local task_thread = ngx.thread.spawn(task)
local timeout_thread = ngx.thread.spawn(function(timeout)
ngx.sleep(timeout)
error("timeout")
end, 2)
local ok, res = ngx.thread.wait(task_thread, timeout_thread)
if not ok then
if res == "timeout" then
ngx.thread.kill(task_thread)
ngx.say("task cancelled by timeout")
return
end
ngx.say("task failed, result: ", res)
end
ngx.thread.kill(timeout_thread)
주의 하 세 요. 어떤 협 정 이 물 러 난 후에 우 리 는 다른 협 정 을 kill 해 야 합 니 다.호출
ngx.exit
같은 방법 으로 명시 적 으로 물 러 나 지 않 으 면 모든 협 정 이 물 러 날 때 까지 현재 단계 가 끝나 지 않 기 때문이다.문서 의 내용 참조:
By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate until
Javascript 의 Promise. race / all 을 모 의 합 니 다.
Promise. race / all 은 여러 개의 Promise 를 받 은 다음 새 Promise 로 포장 하여 되 돌려 줍 니 다. 관련 문 서 를 참조 하 십시오:
The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.
The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.
여기 서 reject 는 협정 실행 중 error 를 던 지 는 것 과 같 습 니 다. resolve 는 협정 에 비해 결 과 를 되 돌려 줍 니 다. 이 두 API 는 reject 의 처리 가 일치 합 니 다. 모든 오류 가 발생 하면 즉시 이상 결 과 를 되 돌려 줍 니 다. 정상 적 인 결과 에 대해 서 는 race 는 첫 번 째 결과 가 나 올 때 되 돌아 오고 all 은 모든 결과 가 나 온 후에 돌아 갑 니 다. 주의해 야 할 것 은 Javascript 입 니 다.원생 의 Promise 는 일시 적 으로 cancell 기능 이 없습니다. 따라서 그 중 하나 인 Promise reject 가 되 더 라 도 다른 Promise 는 계속 실 행 될 것 입 니 다. 이에 따라 저희 도 이 사 를 왔 습 니 다.
Promise. race 의 실현:
local function apple()
ngx.sleep(0.1)
--error("apple lost")
return "apple done"
end
local function banana()
ngx.sleep(0.2)
return "banana done"
end
local function carrot()
ngx.sleep(0.3)
return "carrot done"
end
local function race(...)
local functions = {...}
local threads = {}
for _, f in ipairs(functions) do
local th, err = ngx.thread.spawn(f)
if not th then
-- Promise.race cancell ,
-- ,
return nil, err
end
table.insert(threads, th)
end
local ok, res = ngx.thread.wait(unpack(threads))
if not ok then
return nil, res
end
return res
end
local res, err = race(apple, banana, carrot)
ngx.say("res: ", res, " err: ", err)
ngx.exit(ngx.OK)
Promise. all 의 실현:
local function all(...)
local functions = {...}
local threads = {}
for _, f in ipairs(functions) do
local th, err = ngx.thread.spawn(f)
if not th then
return nil, err
end
table.insert(threads, th)
end
local res_group = {}
for _ = 1, #threads do
local ok, res = ngx.thread.wait(unpack(threads))
if not ok then
return nil, res
end
table.insert(res_group, res)
end
return res_group
end
Go 안의 channel 을 모 의 합 니 다 (부분 만 구현)
더 나 아가 Go 안의 channel 을 모 의 해 보 세 요. 우 리 는 다음 과 같은 의 미 를 실현 해 야 합 니 다.
ngx.semaphore
.local semaphore = require "ngx.semaphore"
local Chan = {
new = function(self)
local chan_attrs = {
_read_sema = semaphore:new(),
_write_sema = semaphore:new(),
_exclude_sema = semaphore:new(),
_buffer = nil,
_waiting_thread_num = 0,
}
return setmetatable(chan_attrs, {__index = self})
end,
send = function(self, value, timeout)
timeout = timeout or 60
while self._buffer do
self._waiting_thread_num = self._waiting_thread_num + 1
self._exclude_sema:wait(timeout)
self._waiting_thread_num = self._waiting_thread_num - 1
end
self._buffer = value
self._read_sema:post()
self._write_sema:wait(timeout)
end,
receive = function(self, timeout)
timeout = timeout or 60
self._read_sema:wait(timeout)
local value = self._buffer
self._buffer = nil
self._write_sema:post()
if self._waiting_thread_num > 0 then
self._exclude_sema:post()
end
return value
end,
}
local chan = Chan:new()
--
local function worker_a(ch)
for i = 1, 10 do
ngx.sleep(math.random() / 10)
ch:send(i, 1)
end
end
local function worker_c(ch)
for i = 11, 20 do
ngx.sleep(math.random() / 10)
ch:send(i, 1)
end
end
local function worker_d(ch)
for i = 21, 30 do
ngx.sleep(math.random() / 10)
ch:send(i, 1)
end
end
local function worker_b(ch)
for _ = 1, 20 do
ngx.sleep(math.random() / 10)
local v = ch:receive(1)
ngx.say("recv ", v)
end
end
local function worker_e(ch)
for _ = 1, 10 do
ngx.sleep(math.random() / 10)
local v = ch:receive(1)
ngx.say("recv ", v)
end
end
ngx.thread.spawn(worker_a, chan)
ngx.thread.spawn(worker_b, chan)
ngx.thread.spawn(worker_c, chan)
ngx.thread.spawn(worker_d, chan)
ngx.thread.spawn(worker_e, chan)
아 날로 그 버 프 레 드 채널 도 가능 합 니 다.
local ok, new_tab = pcall(require, "table.new")
if not ok then
new_tab = function (_, _) return {} end
end
local BufferedChan = {
new = function(self, buffer_size)
if not buffer_size or buffer_size <= 0 then
error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")
end
local chan_attrs = {
_read_sema = semaphore:new(),
_write_sema = semaphore:new(),
_waiting_thread_num = 0,
_buffer_size = buffer_size,
}
chan_attrs._buffer = new_tab(buffer_size, 0)
return setmetatable(chan_attrs, {__index = self})
end,
send = function (self, value, timeout)
timeout = timeout or 60
while #self._buffer >= self._buffer_size do
self._waiting_thread_num = self._waiting_thread_num + 1
self._write_sema:wait(timeout)
self._waiting_thread_num = self._waiting_thread_num - 1
end
table.insert(self._buffer, value)
self._read_sema:post()
end,
receive = function(self, timeout)
timeout = timeout or 60
self._read_sema:wait(timeout)
local value = table.remove(self._buffer)
if self._waiting_thread_num > 0 then
self._write_sema:post()
end
return value
end,
}
local chan = BufferedChan:new(2)
-- ...
물론 위의 짝 퉁 은 많은 문제 가 있 습 니 다. 예 를 들 어 중요 한 select 지원 이 부족 하고 close 와 관련 된 특성 도 실현 하지 못 했 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
안드로이드에서 Kotlin coroutine (Async, Await)을 사용하여 Sakut과 HTTP 통신 (비동기 처리)REST API를 두드려 얻은 결과를 바탕으로 UI를 업데이트 할 때 HTTP 통신은 메인 스레드 외부에서 수행되어야합니다 (비동기 처리 필요) 취득한 json 데이터의 퍼스가 번거로움( , 등 여러가지 라이브러리는...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.