Elixir 모의 SSE 서버

36188 단어 elixirdockersse



서버는 HTTP/2 라우터입니다.



서버 전송 이벤트는 "단순"HTTP 요청입니다. 브라우저에는 내장 인터페이스가 있으며 SSE 서버를 만드는 데 필요한 것은 SSE를 보내는 GET 엔드포인트를 제공하는 것뿐입니다. Javascript 프런트 엔드는 간단한 호출new EventSource(back-endpoint)을 통해 SSE에 연결하고 청취합니다(마지막의 클라이언트 코드 참조).
우리는 the example of Plug base HTTP server을 따라 웹서버 Cowboy와 함께 Elixir로 작성된 간단한 모의 SSE 서버의 Docker 이미지를 생성합니다.

To overcome the limitations of SSE over HTTP1.1, we will upgrade from HTTP to HTTP/2 via HTTPS (this is the browsers constraint).



❗ HTTP/2를 통해서도 동시 개방 연결이 50개로 제한되어 있음을 발견했습니다. 따라서 정교함의 필요성은 의심스럽습니다. 마지막에 일부 "더 심각한"라이브러리가 표시되더라도 이는 중요한 제한 사항으로 보이며 선호하는 프로토콜은 HTTP 대신 웹 소켓이어야 합니다. 이 낮은 제한으로 인해 쉬운 확장성을 희생하지 않기 때문입니다.

이 간단한 예에서 서버는 3개의 끝점(GET 2개와 POST 1개)을 노출합니다. 첫 번째 GET은 주기적 신호의 모의입니다. 주기적으로 알파벳 문자를 내보냅니다. 왜 안 돼! 두 번째 GET은 POST 끝점에 게시된 메시지를 다시 내보냅니다. 이벤트 버스는 Phoenix.PubSub를 사용하여 상태(페이로드)를 엔드포인트 간에 전달하므로 앱의 모든 부분에서 사용할 수 있습니다. 필요한 경우 배포할 수도 있습니다.

defmodule SSE do
  import Plug.Conn
  use Plug.Router

  # define the front-end urls that are permitted to reach the back-end to the CORS Plug.
  @front1 http://localhost:3000
  @front2 https://front-end.surge.sh

  plug(:match)
  # set CORS between the front-end and back-end
  plug(CORSPlug, origin: [@front1, @front2])
  plug(Plug.SSL, rewrite_on: [:x_forwarded_host, :x_forwarded_port, :x_forwarded_proto])
  plug(Plug.Parsers, parsers: [:json], pass: ["text/*", "application/json"], json_decoder: Jason)
  plug(:dispatch)

  #source emits a random letter every 5 seconds
  get "/sse" do
    prepare_sse(conn)
    |> send_letter()
  end


  # message posting endpoint that we broadcast on the topic "post"
  post "/post" do
    with params <- conn.params,
         msg <- make_message(params) do
      Phoenix.PubSub.broadcast(SSE.PubSub, "post", {:post, msg})
      conn |> resp(303, "broadcasted") |> send_resp()
    end
  end

  #source emits an SSE every time a message is received on a topic "post" 
  get "/post" do
    Phoenix.PubSub.subscribe(SSE.PubSub, "post")
    prepare_sse(conn)  
    receive do
      {:post, data} ->
        chunk(conn, data)
    end

    conn
  end

  #function plug
  defp prepare_sse(conn) do
    conn
    |> Plug.Conn.put_resp_header("connection", "keep-alive")
    |> Plug.Conn.put_resp_header("content-type", "text/event-stream")
    |> send_chunked(200)
  end

  defp make_message(params) do
    data = Jason.encode!(params)
    uuid = uuid4()
    "event: message\ndata: #{data}\nid: #{uuid}\nretry: 6000\n\n"
  end

  # we send a letter of the alphabet every 5 seconds
  defp send_letter(conn, x \\ "a") do
    msg = make_message(%{msg: x})
    {:ok, _conn} = chunk(conn, msg)
    :timer.sleep(5_000)

    send_letter(conn, get_random())
  end

  defp get_random() do
    Enum.map(?a..?z, fn x -> <<x::utf8>> end)
    |> Enum.random()
  end

  def uuid4() do
    :uuid.get_v4() |> :uuid.uuid_to_string()|> to_string()
  end
end


애플리케이션이 감독되고 start 함수가 아래에 정의되어 있습니다.

defmodule SSE.Application do
  use Application

  def start(_type, _args) do
    plug_options = [
      port: app_port(),
      compress: true,
      cipher_suite: :strong,
      certfile: "priv/cert/sse+2.pem",
      keyfile: "priv/cert/sse+2-key.pem",
      otp_app: :sse,
      protocol_options: [idle_timeout: :infinity]
    ]

    children = [
      {Plug.Cowboy, scheme: :https, plug: SSE.Router, options: plug_options},
      {Phoenix.PubSub, name: SSE.Pubsub}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: SSE.Supervisor)
  end

  defp app_port do
    System.get_env()
    |> Map.get("PORT", "4043")
    |> String.to_integer()
  end
end


믹스 파일은 다음과 같습니다.

  {:plug_cowboy, "~> 2.5"},
  {:plug_crypto, "~> 1.2"},
  {:cors_plug, "~> 3.0"},
  {:jason, "~>1.3"},
  {:uuid, ">= 2.0.4", [hex: :uuid_erl]},
  {:phoenix_pubsub, "~> 2.0"},
  {:credo, "~> 1.6", only: [:dev, :test], runtime: false},
  {:httpoison, "~> 1.8", only: [:dev, :test]}



테스트



우리는 주로 인터페이스를 테스트할 것입니다. cURL , Elixir 코드 및 브라우저의 세 가지 테스트 방법이 있습니다.

곱슬 곱슬하다



다른 터미널에서 cURL.

curl https://localhost:4000/sse

curl -H 'Content-type: application/json' \
-d {"test": "sent me via SSE"}' \ https://localhost:4000/post



엘릭서 클라이언트



HTTP 요청만 사용하고 있으므로 HTTPoison을 사용하여 SSE를 클라이언트로 사용합니다. HTTPoison.AsyncChunk를 사용하여 SSE를 수신할 수 있으며 연결은 무기한 열린 상태로 유지됩니다. 이것은 HTTP1.1에서 작동했지만 :ssl가 아무 문제 없이 https 끝점에 도달했음에도 불구하고 curl 옵션에서 작동하도록 만들 수 없었습니다.

HTTP 연결에서 임의의 문자를 내보내는 방식은 이 테스트를 자동화하는 데 도움이 되지 않습니다. 우리는 주로 인터페이스에 관심이 있으므로 "_test.exs"파일을 만드는 대신 iex 세션을 실행하여 인터페이스를 테스트합니다.

> iex -S mix

# check if the broadcasted message is sent by SSE on "/post"
iex> Test.is_broadcasted("hello") === %{"test"=> "hello"}

# check if the stream is sent by SSE on "/sse"
iex> Test.sse_receiver(20000)
# a list with letters builds up...during 20s

# run 50 simultaneous open connections during 20s
iex> Enum.each(1..50, fn _ -> Test.sse_receiver(20000) end)


❗ 이 모듈은 HTTP에서만 작동합니다.

defmodule Test do
  @moduledoc false
  require Logger

  @headers [{"Content-Type", "application/json"}]
  @url_post "http://localhost:4043/post"
  @url_sse "http://localhost:4043/sse"

  defp is_posted(text \\ "ok") do
    Phoenix.PubSub.subscribe(SSE.PubSub, "post")
    {:ok, msg} = Jason.encode(%{test: text})
    case HTTPoison.post!(@url_post, msg, @headers) do
      %HTTPoison.Response{status_code: code} ->
        code
    end
  end

  def is_broadcasted(text) do
    case is_posted(text) do
      303 ->
        receive do
          {:post, data} ->
            regexme(data)
        end
    end
  end  

  def sse_receiver(time) do
    m = []
    Task.start(fn ->
      Logger.info("starting test")
      Task.start(fn ->
        HTTPoison.get!(@url_sse, [],
          recv_timeout: :infinity,
          stream_to: self()
        )

        receiver(m)
      end)

      Process.sleep(time)
      Logger.info("end of test")
      Process.exit(pid, :kill)
    end)
  end

  defp receiver(m) do
    receive do
      %HTTPoison.AsyncChunk{chunk: chunk} ->
        data = regexme(chunk)
        m = [data["msg"] | m]
        Logger.debug(m)
        receiver(m)
    end
  end

  defp regexme(text) do
    text |> String.split("\n") |> Enum.at(1) |> String.split(" ") |> Enum.at(1) |> Jason.decode!()
  end
end



클라이언트 코드



React 앱 "create-react-app"을 신속하게 스캐폴딩하고 Server Sent Events에 반응할 마지막에 설명된 작은 구성 요소를 추가할 수 있습니다. CORS를 활성화하면 개발 모드에서 종단 간 HTTPS가 필요하지 않을 수 있습니다.

you can set up a secure front-end quickly with Surge: just build the code and run surge ./build with it's CLI once it's installed. You will get an url such as https://demo.surge.sh.

HTTP2 set-up. For the back-end, we can use a reverse proxy for the TLS termination. For example, Nginx Proxy Manager or Caddy Server automate the certificates for you. Caddy automatically uses HTTP2. We can alternatively terminate the connection directly to the webserver Cowboy: we then need to add self-signed certificates to it (in dev mode). You can use mkcert or the Elixir package X509: generate self-signed certificates with mix x509.gen.selfsigned.



Valtio를 사용하여 동적 렌더링을 얻었지만 useEffect도 사용할 수 있었습니다. React 스캐폴드에서 아래 구성 요소와 같은 것을 사용하십시오.

const { proxy, useSnapshot } from 'valtio'
const { derive } from 'valtio/utils'

const state = proxy({messages: {letter: null, post: null})
const sse = derive({
  getMsg: (get) => {
    const evtSource1 = new EventSource(process.env.REACT_APP_SSE_URL_SSE);
    evtSource1.addEventListener('message', (e) => 
      get(state.messages).letter = e.data
    );
    const evtSource2 = new EventSource(process.env.REACT_APP_SSE_URL_POST);
    evtSource2.addEventListener('message', (e) => 
      get(state.messages).post = e.data
    );
 }
})

const SSE = () => {
  const { messages: {letter, post} } = useSnapshot(state)
  return <>{letter}{" "}{post}</>
}



실행



우리는 할 수 있지만MIX_END=prod PORT=4000 mix run --no-halt 모의를 미리 빌드된 Docker 이미지로 사용하는 것이 아이디어입니다.

도커파일



서버 코드를 배포하기 위해 컨테이너를 배송합니다. 우리는 릴리스를 빌드하므로 다단계 Dockerfile을 사용하여 Elixir SSE 서버의 작은 이미지(20M)를 생성합니다.

ROM  bitwalker/alpine-elixir:latest AS build
ARG NAME
ARG PORT
ENV ENV=${MIX_ENV:-prod}
WORKDIR /opt/app
RUN mix do local.hex --force, local.rebar --force
COPY mix.exs mix.lock ./
COPY config /.config
RUN mix do deps.get --only ${ENV}
COPY lib ./lib
COPY priv ./priv
COPY rel ./rel/
RUN  MIX_ENV=prod mix release ${NAME} --quiet


FROM alpine:latest AS app
ARG NAME
ENV PORT=${PORT}
WORKDIR /opt/app
RUN apk --update --no-cache add openssl ncurses-libs libstdc++ libgcc
RUN chown -R nobody: /opt/app
USER nobody
EXPOSE 4043
ENV HOME=/app
ENV NAME=${NAME}
ENV MIX_ENV=prod
COPY --from=build --chown=nobody:root /opt/app/_build/${MIX_ENV}/rel/${NAME} ./

CMD ./bin/${NAME} start


그런 다음 빌드(이름 전달) 및 이미지 실행(필수 env PORT 설정)

docker build --build-arg NAME=myapp -t myapp:v1 .
docker run -it --rm --env PORT=4043 -p 443:4043 myapp:v1



SSE 라이브러리



https://github.com/mustafaturan/sse
https://github.com/codenoid/elixir-sse-example/blob/master/lib/sse_example_web/helpers/ticker.ex
https://github.com/CrowdHailer/server_sent_event.ex

좋은 웹페이지 즐겨찾기