Springboot Websocket Stomp 메시지 구독 푸 시

필요 배경
잡담 은 그만 두 고 본론 으로 달려가다.웹 프론트 엔 드 와 긴 링크 를 만들어 서로 실시 간 으로 통신 해 야 하기 때문에 웹 소켓 을 생각 했 습 니 다.그 다음 에 수요 에 따라 사용자 가 테 마 를 구독 하고 메시지 의 정확 한 푸 시,구독 게시 등 을 실현 해 야 하기 때문에 STOMP(Simple Text-Orientated Messaging Protocol)가 메 시 지 를 대상 으로 하 는 간단 한 텍스트 프로 토 콜 을 생각 했 습 니 다.
웹 소켓 프로 토 콜
이전에 쓴 웹 소켓 의 긴 링크 를 생각 한 demo 도 참고 할 수 있 도록 코드 를 붙 였 습 니 다.
pom 파일
spring-boot-starter-websocket 을 직접 도입 하면 됩 니 다.

    	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

웹 소켓 엔 드 포인트 설명

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @ClassName WebSocketConfig
 * @Author scott
 * @Date 2021/6/16
 * @Version V1.0
 **/
@Configuration
public class WebSocketConfig {

    /**
     *     ServerEndpointExporter, Bean       @ServerEndpoint     websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
웹 소켓 구현 클래스,그 중에서 주 해 를 통 해 각종 사건 을 감청 하고 푸 시 메시지 등 관련 논 리 를 실현 합 니 다.

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: DataTypePushWebSocket
 * @Author: scott
 * @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {

    private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);

    /**
     *          
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
            .initialCapacity(10)
            .maximumSize(300)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();

    /**
     *            
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token")String token) {
        String sessionId = session.getId();
        onlineCount.incrementAndGet(); //     1
        this.sendMessage("sessionId:" + sessionId +",   server    ", session);
        SESSION_CACHE.put(sessionId,session);
        log.info("      :{},        :{}", session.getId(), onlineCount.get());
    }

    /**
     *          
     */
    @OnClose
    public void onClose(Session session,@PathParam("token")String token) {
        onlineCount.decrementAndGet(); //     1
        SESSION_CACHE.invalidate(session.getId());
        log.info("      :{},        :{}", session.getId(), onlineCount.get());
    }

    /**
     *              
     *
     * @param message           
     */
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("token")String token) {
        log.info("        [{}]   :{}", session.getId(), message);
        this.sendMessage("          :" + message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("    ");
        error.printStackTrace();
    }

    /**
     *            
     */
    private static void sendMessage(String message, Session toSession) {
        try {
            log.info("       [{}]    {}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("             :{}", e);
        }
    }

    public static AjaxResult sendMessage(String message, String sessionId){
        Session session = SESSION_CACHE.getIfPresent(sessionId);
        if(Objects.isNull(session)){
            return AjaxResult.error("token   ");
        }
        sendMessage(message,session);
        return AjaxResult.success();
    }

    public static AjaxResult sendBroadcast(String message){
        long size = SESSION_CACHE.size();
        if(size <=0){
            return AjaxResult.error("         ,      ");
        }
        ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
        Set<String> keys = sessionConcurrentMap.keySet();
        for (String key : keys) {
            Session session = SESSION_CACHE.getIfPresent(key);
            DataTypePushWebSocket.sendMessage(message,session);
        }

        return AjaxResult.success();

    }

}
이로써 웹 소켓 서버 코드 가 완료 되 었 습 니 다.
stomp 프로 토 콜
전단 코드 입 니 다.이것 은 어떤 vue 프로젝트 에서 쓴 js 입 니 다.여러분 이 직접 고치 면 됩 니 다.그 중에서 Settings.wsPath 는 백 엔 드 에서 정 의 된 ws 주소 입 니 다.예 를 들 어 ws:/localhost:9003/ws

import Stomp from 'stompjs'
import Settings from '@/settings.js'

export default {
  //            
  debug:true,
  //        
  stompClient:{},
  //    
  init(callBack){
    this.stompClient = Stomp.client(Settings.wsPath)
    this.stompClient.hasDebug = this.debug
    this.stompClient.connect({},suce =>{
      this.console("    ,     ↓")
      this.console(this.stompClient)
      if(callBack){
        callBack()
      }
    },err => {
      if(err) {
        this.console("    ,     ↓")
        this.console(err)
      }
    })
  },
  //   
  sub(address,callBack){
    if(!this.stompClient.connected){
      this.console("    ,    ")
      return
    }
    //    id
    let timestamp= new Date().getTime() + address
    this.console("     -> "+address)
    this.stompClient.subscribe(address,message => {
      this.console(address+"       ,     ↓")
      this.console(message)
      let data = message.body
      callBack(data)
    },{
      id: timestamp
    })
  },
  unSub(address){
    if(!this.stompClient.connected){
      this.console("    ,       -> "+address)
      return
    }
    let id = ""
    for(let item in this.stompClient.subscriptions){
      if(item.endsWith(address)){
        id = item
        break
      }
    }
    this.stompClient.unsubscribe(id)
    this.console("       -> id:"+ id + " address:"+address)
  },
  //     
  disconnect(callBack){
    if(!this.stompClient.connected){
      this.console("    ,      ")
      return
    }
    this.stompClient.disconnect(() =>{
      console.log("    ")
      if(callBack){
        callBack()
      }
    })
  },
  //     
  reconnect(time){
    setInterval(() =>{
      if(!this.stompClient.connected){
        this.console("     ...")
        this.init()
      }
    },time * 1000)
  },
  console(msg){
    if(this.debug){
      console.log(msg)
    }
  },
  //        
  send(address,msg) {
    this.stompClient.send(address,{},msg)
  }
}
백 엔 드 stomp config,안에 주석 이 있 고 상세 하 게 쓰 여 있 으 며 저 는 전단 의 심장 박동 ping pong 을 넣 었 습 니 다.

package com.cn.scott.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @ClassName: WebSocketStompConfig
 * @Author: scott
 * @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    private static long HEART_BEAT=10000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //    socketJs    ,    webSocket,    
        //               
        //ws://127.0.0.1:port/ws      WebSocket  
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();
        //     STOMP       mq     
        //  Broker  ,/user           ,/topic         
        //setHeartbeatValue          
        registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
        //          ,     ,    /user/
        registry.setUserDestinationPrefix("/user/");
    }
}
백 엔 드 stomp 프로 토 콜 수락,구독 등 동작 알림

package com.cn.scott.ws;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName StompSocketHandler
 * @Author scott
 * @Date 2021/6/30
 * @Version V1.0
 **/
@RestController
public class StompSocketHandler {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    /**
    * @MethodName: subscribeMapping
     * @Description:       
     * @Param: [id]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @SubscribeMapping("/user/{id}/listener")
    public void subscribeMapping(@DestinationVariable("id") final long id) {
        System.out.println(">>>>>>  :"+id +",   ");
        SubscribeMsg param = new SubscribeMsg(id,String.format("  【%s】     ", id));
        sendToUser(param);
    }


    /**
    * @MethodName: test
     * @Description:     topic  
     * @Param: [id, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @MessageMapping(value = "/user/{id}/listener")
    public void UserSubListener(@DestinationVariable long  id, String msg) {
        System.out.println("     :" +id+",   ");
        SubscribeMsg param = new SubscribeMsg(id,String.format("     【%s】    【%s】", id,msg));
        sendToUser(param);
    }
    
     @GetMapping("/refresh/{userId}")
    public void refresh(@PathVariable Long userId, String msg) {
        StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("      【%s】    【%s】", userId,msg));
        sendToUser(param);
    }

    /**
    * @MethodName: sendToUser
     * @Description:          
     * @Param: [userId]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendToUser(SubscribeMsg screenChangeMsg){
        //         。。。
        simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
    }

    /**
    * @MethodName: sendBroadCast
     * @Description:     ,          
     * @Param: [topic, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendBroadCast(String topic,String msg){
        simpMessagingTemplate.convertAndSend(topic,msg);
    }


    /**
     * @ClassName: SubMsg
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public static class SubscribeMsg {
        private Long userId;
        private String msg;
        public SubscribeMsg(Long UserId, String msg){
            this.userId = UserId;
            this.msg = msg;
        }
        public Long getUserId() {
            return userId;
        }
        public String getMsg() {
            return msg;
        }
    }
}
연결 전시
연결 이 성공 적 으로 이 루어 졌 습 니 다.웹 소켓 프로 토 콜 기반 임 을 알 수 있 습 니 다.
在这里插入图片描述
연결 정보
在这里插入图片描述
ping pong
在这里插入图片描述
인 터 페 이 스 를 호출 하여 구독 자 1 에 게 메 시 지 를 보 냅 니 다.http://localhost:9003/refresh/1?msg=Hello Stomp,클 라 이언 트 콘 솔 에서 이미 받 은 메 시 지 를 볼 수 있 습 니 다.이 럴 때 서로 다른 사용 자 는 자신의 userId 를 통 해 구독 의 주 제 를 구분 할 수 있 고 userId 를 통 해 클 라 이언 트 에 정 보 를 정확하게 전달 할 수 있 습 니 다.
在这里插入图片描述
우리 가 백 엔 드 설정 을 할 때 방송의 구독 주제/topic 도 지정 한 것 을 기억 합 니 다.이때 우 리 는 전단 에서 js 를 통 해 이 주 제 를 구독 하면 백 엔 드 는 이 주제 로 메 시 지 를 보 낼 때 모든 구독 클 라 이언 트 를 받 을 수 있 습 니 다.관심 이 있 는 파트너 는 스스로 해 볼 수 있 습 니 다.api 는 제 가 다 썼 습 니 다.
在这里插入图片描述
이로써 실전 이 끝나 고 좋아 하 는 동료 들 은 관심 을 가 져 주 고 찬 사 를 보 냈 다.
springboot+stomp 백 엔 드 소스 주소:https://gitee.com/ErGouGeSiBaKe/stomp-server
Springboot Websocket Stomp 메시지 구독 푸 시 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 Springboot Websocket Stomp 메시지 구독 푸 시 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 시기 바 랍 니 다.앞으로 도 많은 응원 부 탁 드 리 겠 습 니 다!

좋은 웹페이지 즐겨찾기