rest 푸시 구현--jesey SSE

7005 단어 socketRESTjerseysse

rest 푸시 구현 – jesey SSE


서버 전송 기술은 서버 측의 업무 데이터가 자원 상태가 바뀔 때 서버가 이 정보를 브라우저에 자발적으로 통지할 수 있는 통신 기술이다.여기에서 우리는 TCP/IP 프로토콜이 구축한 연결을 논의하지 않는다. 이런 Socket을 바탕으로 한다. 그러나 연결이 구축되면 이런 양방향 통신 체인에서 언제든지 알림을 보낼 수 있다.만약에 우리가 프로젝트에서 실현한다면 일반적으로 제3자를 사용한다. 예를 들어 극광 전송도 TCP장 연결을 구축하여 실현한다.여기서 HTTP 프로토콜 아래의 푸시를 소개합니다.

어떤 푸시 기술이 있습니까?


1. Polling 기술


여기서 간단하게polling(윤문)만 말하자면 클라이언트가 주기적으로 서버에 접근하여 데이터를 얻는 것이다.
장점: 클라이언트 퀴즈 기술은 실현하기 쉽다.이 때문에 서버나 브라우저에서 제3자 라이브러리를 추가로 사용할 필요가 없습니다.
단점: 클라이언트가 매번 요청할 때마다 새로운 Http 연결을 만들고 끝날 때 닫아야 합니다. 업데이트된 데이터가 없어도 대량의 Http 연결을 만들어서 업데이트 여부를 조회합니다.

2.Comet


리버스 AJAX의 기술 집합은 긴 폴링과 흐름 두 가지 기술 구현을 포함한다.역방향 AJAX는 HTTP1.1의 keepalive 지속적 연결 기술을 이용하여 클라이언트가 요청을 한 후 Keepalive를 통해 서버를 저장하여 브라우저에 응답하는 통신을 합니다.
장점: 윤문하는 서버 네트워크 대역폭 소모 문제를 해결하여 밀어내기가 되었다.
단점: Comet는 서버의 추가 기술 지원을 필요로 하고 서버와 클라이언트가 동시에 제3자 도구 패키지를 도입하여 상대적으로 복잡하게 실현해야 한다.

3. HTML5 기술 집합의 SSE 및 WebSocket(TCP 이중 채널)


  http://blog.csdn.net/li563868273/article/details/50251267
나의 이 게시물에는 상세한 SSE 소개가 있다.나는 여기에서 주로 어떻게 실현하는지를 말한다.

어떻게 푸시를 실현합니까?


저지의 SSE MAVEN 의존 패키지는 다음과 같습니다.
<dependency>
  <groupId>org.glassfish.jersey.media</groupId>
  <artifactId>jersey-media-sse</artifactId>
  <version>${jersey.version}</version>
</dependency>

Jersey의 SSE는 두 가지 통신 모드가 있는데 그것이 바로 구독 모드(단대단, 일대일 모드), 방송 모드(다중 클라이언트가 서버에 대한 멀티캐스트 모드)이다.

1. 게시-구독 모드


첫 번째 단계: EventSource는 EventListener 인터페이스를 실현했다. EventListener는 onEvent 방법이 있는데 매개 변수는 Inbound Event, 즉 입국 이벤트(javabean이기도 하고 SSE의 규범에 따라 우리는 주로 ID, 데이터,name)를 실현한다.
public interface EventListener {
void onEvent(InboundEvent var1);
}

우리는 클라이언트에서 EventSource를 실례화합니다.
@Test
public void testEventSource() throws InterruptedException, URISyntaxException {
// 10 post
final int testCount = 10;
final String messagePrefix = "pubsub-";
// CountDownLatch eventSource.close
final CountDownLatch latch = new CountDownLatch(testCount);
// eventSource , 
final EventSource eventSource = new EventSource(target().path(ROOT_PATH)) {
private int i;
@Override
// , 
public void onEvent(InboundEvent inboundEvent) {
try {
System.out.println("Received: " + inboundEvent.getId() + ":" + inboundEvent.getName() + ":" + new String(inboundEvent.getRawData()));
Assert.assertEquals(messagePrefix + i++, inboundEvent.readData(String.class));
// , 0 await
latch.countDown();
} catch (ProcessingException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < testCount; i++) {
target().path(ROOT_PATH).request().post(Entity.text(messagePrefix + i));
}
try {
latch.await();
} finally {
eventSource.close();// eventSource, 。
}
}

두 번째 서버 측은 어떻게 설정합니까
클라이언트가 먼저 GET로 서버에 접근해서 채널을 얻을 것입니다. 이 채널은 포트 쌍에 속합니다. 만약post가 있을 때 이벤트 아웃풋이 Outbound 이벤트 (출구 이벤트) 를 씁니다.자세한 절차를 알고 싶으면jerseysse 패키지의 원본을 보십시오.
@Path("pubsub")
public class AirSsePubSubResource {
// eventOutput OutboundEvent( )
private static EventOutput eventOutput=new EventOutput();
@GET// SSE 
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput publishMessage() throws IOException{

return  eventOutput;
}
@POST
public Boolean saveMessage(String message) throws IOException {
// , 
eventOutput.write(new OutboundEvent.Builder().
id(System.nanoTime() + "").
name("post message").
data(String.class, message).
build());
return true;
}
}

2. 방송 모델 – 생산자 - 소비자 모델의 구현


방송 모델과 발표-구독은 주로 서버 측과 이루어진다. 서버 측은 사실 생산자-소비자 모델의 구현이다. 클라이언트가 get을 이용하여 채널을 열면 서버 측은 이 채널을 대열에 가입한다. 다른 방법에서도 소비자가 이 대열의 채널에 대해 소비하고 정보를 전송한다. 아래에 제시된 코드는 클라이언트가 일정한 수량을 만족시킬 때 실현할 수 있다.package com.lz.sse;
import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ChunkedOutput;

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by lizhaoz on 2015/12/10.
 */
@Path("broadcast")
public class AirSseBroadcastResource {
private static final BlockingQueue<BroadcastProcess> processQueue = new LinkedBlockingQueue<>(1);

@Path("book")
@POST
public Boolean postBook(@DefaultValue("0") @QueryParam("total") int total, String bookName) {
final BroadcastProcess broadcastProcess = new BroadcastProcess(total, bookName);
processQueue.add(broadcastProcess);
Executors.newSingleThreadExecutor().execute(broadcastProcess);

return true;
}
@Path("book/clear")
@DELETE
public Boolean clear() {
processQueue.clear();
return true;
}

@Path("book")
@Produces(SseFeature.SERVER_SENT_EVENTS)
@GET
public EventOutput getBook(@DefaultValue("0") @QueryParam("clientId") int clientId) throws InterruptedException {
System.out.println("clientId=" + clientId);
BroadcastProcess broadcastProcess = processQueue.peek();
if (broadcastProcess != null) {
final long countDown = broadcastProcess.countDown();
System.out.println("countDown count= " + countDown);
final EventOutput eventOutput = new EventOutput();
// 
broadcastProcess.getBroadcaster().add(eventOutput);
return eventOutput;
} else {
throw new NotFoundException("No new broadcast.");
}
}
static class BroadcastProcess implements Runnable {
private final long processId;
private final String bookName;
private final CountDownLatch latch;
private final SseBroadcaster broadcaster = new SseBroadcaster() {
@Override
public void onException(ChunkedOutput<OutboundEvent> out, Exception e) {

}
};

public BroadcastProcess(int total, String bookName) {
this.processId = System.nanoTime();
this.bookName = bookName;
latch = total > 0 ? new CountDownLatch(total) : null;
}

public long getProcessId() {
return processId;
}

public SseBroadcaster getBroadcaster() {
return broadcaster;
}

public long countDown() {
if (latch == null) {
return -1L;
}
latch.countDown();
return latch.getCount();
}

public void run() {
try {
if (latch != null) {
// 
latch.await();
}
OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder().mediaType(MediaType.TEXT_PLAIN_TYPE);
OutboundEvent event = eventBuilder.id(processId + "").name("New Book Name").data(String.class, bookName).build();
// 
broadcaster.broadcast(event);
// 
broadcaster.closeAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

이 SSE 기본 소개를 참조하십시오.https://github.com/lzggsimida123/jsse항목을 다운로드하여 테스트할 수 있습니다.

좋은 웹페이지 즐겨찾기