Elixir에서 한 번만 작업하기

17318 단어 stampedingherdelixir
여러 프로세스가 동일하고 잠재적으로 비용이 많이 드는 기능을 실행해야 하는 경우가 종종 있습니다. 일반적인 솔루션은 향후 호출자를 위해 계산된 값을 캐시하는 것이지만 콜드 캐시(또는 캐시된 값이 만료된 경우)에서는 여전히 함수가 여러 번 실행될 수 있습니다. 이러한 자원 사용의 급증은 무리 짓기 문제로 알려져 있습니다. 동물들만이 질서 있게 진행된다면.

ElixirGenServer는 동기 실행이 보장되는 handle_call 콜백을 제공합니다. 따라서 여러 프로세스가 GenServer 를 호출하면 순차적으로 실행되며 다음 호출자를 위해 값을 캐시할 수도 있습니다. 문제 해결됨.

그러나 handle_call 또는 다른 콜백 내에서 실행되는 모든 것이 차단되어 잠재적으로 GenServer가 응답하지 않는 것처럼 보이고 병목 현상이 발생할 수 있습니다. Python의 GIL에 대해 들어본 적이 있습니다 :)

그러나 GenServer가 별도의 프로세스로 실행할 작업을 제공하고 콜백 실행 시간을 최소화할 수 있다면 어떨까요? 이 작업을 수행하려면 두 가지 개념을 다루어야 합니다.

handle_call은 즉시 응답할 필요가 없습니다.



일반적인handle_call 구현 구조에 익숙할 것입니다.

def handle_call(request, from, state) do
  result, new_state = calculate_state(request, state )
  {:reply, result, new_state}
end


튜플의 :reply 는 결과가 호출자(from )에게 즉시 전송됨을 의미합니다. 그러나 handle_call{:noreply, new_state}를 반환할 수도 있습니다. 이 경우 호출자는 응답을 받거나 시간이 초과될 때까지 대기 상태로 유지됩니다.

처음에 {:noreply, new_state}를 반환한 후 발신자에게 답장을 보내려면 GenServer.reply/2를 사용해야 합니다. 여기에는 from와 일종의 결과가 필요합니다. 모든 콜백 구현 내에서 호출할 수 있습니다.

작업은 메시지 전달을 기반으로 합니다.



결과를 기다리려는 경우에만 Task.async에 전화하는 것이 좋습니다. 일반적으로 Task.await 또는 Task 모듈에서 사용할 수 있는 다른 기능 중 일부를 사용하여 그렇게 합니다. 그러나 후드 아래에서 이러한 기능은 메시지 전달을 통해 작업 결과를 수신하는 데 의존합니다. 즉, Kernel.SpecialForms.receive/1를 사용하여 결과가 롤인되기를 기다립니다.
Task.async 콜백 내에서 GenServer를 호출하면 평소와 같이 Task.await로 대기할 수 있지만 작업 계산 기간 동안 콜백이 차단되어 GenServer가 처리하지 못하게 됩니다. 다른 전화. GenServer에서 원하는 것이 무엇인지에 따라 목적을 완전히 상실할 수 있습니다.

다행스럽게도 GenServer 내에서 handle_info 콜백 구현을 사용하여 다른 메시지를 기다리는 것처럼 작업의 성공 또는 실패를 기다릴 수 있습니다.

def handle_info({ref, result}, state) do
  handle_task_success(ref, result, state)
end

def handle_info({:DOWN, ref, _, _, reason}, state) do
  handle_task_failure(ref, reason, state)
end

ref가 참조임을 확인하십시오. 그렇지 않으면 의도하지 않은 호출을 잡을 수 있습니다.

해결책



그래서 여기에 우르르 몰려드는 무리 문제에 대한 해결책에 대한 광범위한 개요가 있습니다. 함수 결과에 관심이 있는 프로세스는 GenServer를 호출하여 결과를 제공할 수 있습니다. GenServer는 결과가 있는 경우 결과를 반환하거나, Task를 생성하여 계산하고 호출자를 추적하거나, 과거에 Task를 생성한 경우 다시 수행하지 않지만 여전히 발신자를 추적합니다. 응답을 받으면 GenServer.reply를 사용하여 모든 발신자에게 알립니다.

전체 내용은 다음과 같습니다.

defmodule GenHerder do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, [], Keyword.put(opts, :name, __MODULE__))
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(request, from, state) do
    case state[request] do
      nil ->
        task =
          Task.Supervisor.async_nolink(__MODULE__.TaskSupervisor, fn ->
            handle_request(request)
          end)

        {:noreply, Map.put(state, request, {:task, task, [from]})}

      {:task, task, froms} ->
        {:noreply, Map.put(state, request, {:task, task, [from | froms]})}

      {:result, result} ->
        {:reply, result, state}
    end
  end

  defp handle_request(_request) do
    :only_once_kenobi
  end

  @impl true
  def handle_info({ref, result}, state) when is_reference(ref) do
    handle_task_success(ref, result, state)
  end

  @impl true
  def handle_info({:DOWN, ref, _, _, reason}, state) do
    handle_task_failure(ref, reason, state)
  end

  defp handle_task_success(ref, result, state) do
    # The task succeeded so we can cancel the monitoring and discard the DOWN message
    Process.demonitor(ref, [:flush])

    {request, _task_and_froms} =
      Enum.find(state, fn
        {_request, {:task, task, _forms}} -> task.ref == ref
        _ -> false
      end)

    {{:task, _task, froms}, state} = Map.pop(state, request)

    state = Map.put(state, request, {:result, result})

    # Send the result to everyone that asked for it
    for from <- froms do
      GenServer.reply(from, result)
    end

    {:noreply, state}
  end

  defp handle_task_failure(ref, reason, state) do
    {request, _task_and_froms} =
      Enum.find(state, fn
        {_request, {:task, task, _forms}} -> task.ref == ref
        _ -> false
      end)

    {{:task, _task, froms}, state} = Map.pop(state, request)

    # Send the result to everyone that asked for it
    for from <- froms do
      GenServer.reply(from, {:error, reason})
    end

    {:noreply, state}
  end
end


감독 트리에서 다음과 같이 시작해야 합니다.

children = [
          {Task.Supervisor, name: GenHerder.TaskSupervisor},
          GenHerder
        ]

Supervisor.start_link(children, Keyword.put(opts, :strategy, :one_for_one))


그런 다음 다음을 사용하여 호출할 수 있습니다.

GenServer.call(GenHerder, request, timeout)


물론 주변에 멋진 래퍼를 추가할 수 있습니다. 그러나 더 간단한 방법은 hexpm에서 내 패키지를 사용하는 것입니다. 자세한 내용은 mix hex.info gen_herder로 전화하십시오.

그것은 약간의 설탕으로 위를 구현하고 선택적 결과 만료를 추가합니다.

좋은 웹페이지 즐겨찾기