Springboot Websocket Stomp 메시지 구독 푸 시
15792 단어 SpringbootWebsocketStomp메시지 구독 푸 시
잡담 은 그만 두 고 본론 으로 달려가다.웹 프론트 엔 드 와 긴 링크 를 만들어 서로 실시 간 으로 통신 해 야 하기 때문에 웹 소켓 을 생각 했 습 니 다.그 다음 에 수요 에 따라 사용자 가 테 마 를 구독 하고 메시지 의 정확 한 푸 시,구독 게시 등 을 실현 해 야 하기 때문에 STOMP(Simple Text-Orientated Messaging Protocol)가 메 시 지 를 대상 으로 하 는 간단 한 텍스트 프로 토 콜 을 생각 했 습 니 다.
웹 소켓 프로 토 콜
이전에 쓴 웹 소켓 의 긴 링크 를 생각 한 demo 도 참고 할 수 있 도록 코드 를 붙 였 습 니 다.
pom 파일
spring-boot-starter-websocket 을 직접 도입 하면 됩 니 다.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
웹 소켓 엔 드 포인트 설명
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @ClassName WebSocketConfig
* @Author scott
* @Date 2021/6/16
* @Version V1.0
**/
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter, Bean @ServerEndpoint websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
웹 소켓 구현 클래스,그 중에서 주 해 를 통 해 각종 사건 을 감청 하고 푸 시 메시지 등 관련 논 리 를 실현 합 니 다.
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName: DataTypePushWebSocket
* @Author: scott
* @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {
private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);
/**
*
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
.initialCapacity(10)
.maximumSize(300)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
/**
*
*/
@OnOpen
public void onOpen(Session session, @PathParam("token")String token) {
String sessionId = session.getId();
onlineCount.incrementAndGet(); // 1
this.sendMessage("sessionId:" + sessionId +", server ", session);
SESSION_CACHE.put(sessionId,session);
log.info(" :{}, :{}", session.getId(), onlineCount.get());
}
/**
*
*/
@OnClose
public void onClose(Session session,@PathParam("token")String token) {
onlineCount.decrementAndGet(); // 1
SESSION_CACHE.invalidate(session.getId());
log.info(" :{}, :{}", session.getId(), onlineCount.get());
}
/**
*
*
* @param message
*/
@OnMessage
public void onMessage(String message, Session session,@PathParam("token")String token) {
log.info(" [{}] :{}", session.getId(), message);
this.sendMessage(" :" + message, session);
}
@OnError
public void onError(Session session, Throwable error) {
log.error(" ");
error.printStackTrace();
}
/**
*
*/
private static void sendMessage(String message, Session toSession) {
try {
log.info(" [{}] {}", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error(" :{}", e);
}
}
public static AjaxResult sendMessage(String message, String sessionId){
Session session = SESSION_CACHE.getIfPresent(sessionId);
if(Objects.isNull(session)){
return AjaxResult.error("token ");
}
sendMessage(message,session);
return AjaxResult.success();
}
public static AjaxResult sendBroadcast(String message){
long size = SESSION_CACHE.size();
if(size <=0){
return AjaxResult.error(" , ");
}
ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
Set<String> keys = sessionConcurrentMap.keySet();
for (String key : keys) {
Session session = SESSION_CACHE.getIfPresent(key);
DataTypePushWebSocket.sendMessage(message,session);
}
return AjaxResult.success();
}
}
이로써 웹 소켓 서버 코드 가 완료 되 었 습 니 다.stomp 프로 토 콜
전단 코드 입 니 다.이것 은 어떤 vue 프로젝트 에서 쓴 js 입 니 다.여러분 이 직접 고치 면 됩 니 다.그 중에서 Settings.wsPath 는 백 엔 드 에서 정 의 된 ws 주소 입 니 다.예 를 들 어 ws:/localhost:9003/ws
import Stomp from 'stompjs'
import Settings from '@/settings.js'
export default {
//
debug:true,
//
stompClient:{},
//
init(callBack){
this.stompClient = Stomp.client(Settings.wsPath)
this.stompClient.hasDebug = this.debug
this.stompClient.connect({},suce =>{
this.console(" , ↓")
this.console(this.stompClient)
if(callBack){
callBack()
}
},err => {
if(err) {
this.console(" , ↓")
this.console(err)
}
})
},
//
sub(address,callBack){
if(!this.stompClient.connected){
this.console(" , ")
return
}
// id
let timestamp= new Date().getTime() + address
this.console(" -> "+address)
this.stompClient.subscribe(address,message => {
this.console(address+" , ↓")
this.console(message)
let data = message.body
callBack(data)
},{
id: timestamp
})
},
unSub(address){
if(!this.stompClient.connected){
this.console(" , -> "+address)
return
}
let id = ""
for(let item in this.stompClient.subscriptions){
if(item.endsWith(address)){
id = item
break
}
}
this.stompClient.unsubscribe(id)
this.console(" -> id:"+ id + " address:"+address)
},
//
disconnect(callBack){
if(!this.stompClient.connected){
this.console(" , ")
return
}
this.stompClient.disconnect(() =>{
console.log(" ")
if(callBack){
callBack()
}
})
},
//
reconnect(time){
setInterval(() =>{
if(!this.stompClient.connected){
this.console(" ...")
this.init()
}
},time * 1000)
},
console(msg){
if(this.debug){
console.log(msg)
}
},
//
send(address,msg) {
this.stompClient.send(address,{},msg)
}
}
백 엔 드 stomp config,안에 주석 이 있 고 상세 하 게 쓰 여 있 으 며 저 는 전단 의 심장 박동 ping pong 을 넣 었 습 니 다.
package com.cn.scott.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* @ClassName: WebSocketStompConfig
* @Author: scott
* @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
private static long HEART_BEAT=10000;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// socketJs , webSocket,
//
//ws://127.0.0.1:port/ws WebSocket
registry.addEndpoint("/ws").setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread-");
te.initialize();
// STOMP mq
// Broker ,/user ,/topic
//setHeartbeatValue
registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
// , , /user/
registry.setUserDestinationPrefix("/user/");
}
}
백 엔 드 stomp 프로 토 콜 수락,구독 등 동작 알림
package com.cn.scott.ws;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName StompSocketHandler
* @Author scott
* @Date 2021/6/30
* @Version V1.0
**/
@RestController
public class StompSocketHandler {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MethodName: subscribeMapping
* @Description:
* @Param: [id]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@SubscribeMapping("/user/{id}/listener")
public void subscribeMapping(@DestinationVariable("id") final long id) {
System.out.println(">>>>>> :"+id +", ");
SubscribeMsg param = new SubscribeMsg(id,String.format(" 【%s】 ", id));
sendToUser(param);
}
/**
* @MethodName: test
* @Description: topic
* @Param: [id, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@MessageMapping(value = "/user/{id}/listener")
public void UserSubListener(@DestinationVariable long id, String msg) {
System.out.println(" :" +id+", ");
SubscribeMsg param = new SubscribeMsg(id,String.format(" 【%s】 【%s】", id,msg));
sendToUser(param);
}
@GetMapping("/refresh/{userId}")
public void refresh(@PathVariable Long userId, String msg) {
StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format(" 【%s】 【%s】", userId,msg));
sendToUser(param);
}
/**
* @MethodName: sendToUser
* @Description:
* @Param: [userId]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
public void sendToUser(SubscribeMsg screenChangeMsg){
// 。。。
simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
}
/**
* @MethodName: sendBroadCast
* @Description: ,
* @Param: [topic, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
public void sendBroadCast(String topic,String msg){
simpMessagingTemplate.convertAndSend(topic,msg);
}
/**
* @ClassName: SubMsg
* @Author: scott
* @Date: 2021/6/30
**/
public static class SubscribeMsg {
private Long userId;
private String msg;
public SubscribeMsg(Long UserId, String msg){
this.userId = UserId;
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public String getMsg() {
return msg;
}
}
}
연결 전시연결 이 성공 적 으로 이 루어 졌 습 니 다.웹 소켓 프로 토 콜 기반 임 을 알 수 있 습 니 다.
연결 정보
ping pong
인 터 페 이 스 를 호출 하여 구독 자 1 에 게 메 시 지 를 보 냅 니 다.http://localhost:9003/refresh/1?msg=Hello Stomp,클 라 이언 트 콘 솔 에서 이미 받 은 메 시 지 를 볼 수 있 습 니 다.이 럴 때 서로 다른 사용 자 는 자신의 userId 를 통 해 구독 의 주 제 를 구분 할 수 있 고 userId 를 통 해 클 라 이언 트 에 정 보 를 정확하게 전달 할 수 있 습 니 다.
우리 가 백 엔 드 설정 을 할 때 방송의 구독 주제/topic 도 지정 한 것 을 기억 합 니 다.이때 우 리 는 전단 에서 js 를 통 해 이 주 제 를 구독 하면 백 엔 드 는 이 주제 로 메 시 지 를 보 낼 때 모든 구독 클 라 이언 트 를 받 을 수 있 습 니 다.관심 이 있 는 파트너 는 스스로 해 볼 수 있 습 니 다.api 는 제 가 다 썼 습 니 다.
이로써 실전 이 끝나 고 좋아 하 는 동료 들 은 관심 을 가 져 주 고 찬 사 를 보 냈 다.
springboot+stomp 백 엔 드 소스 주소:https://gitee.com/ErGouGeSiBaKe/stomp-server
Springboot Websocket Stomp 메시지 구독 푸 시 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 Springboot Websocket Stomp 메시지 구독 푸 시 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 시기 바 랍 니 다.앞으로 도 많은 응원 부 탁 드 리 겠 습 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[MeU] Hashtag 기능 개발➡️ 기존 Tag 테이블에 존재하지 않는 해시태그라면 Tag , tagPostMapping 테이블에 모두 추가 ➡️ 기존에 존재하는 해시태그라면, tagPostMapping 테이블에만 추가 이후에 개발할 태그 기반 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.