Spring Websocket + SockJS + STOMP 실시 간 통신 실현 (5) - Channel Interceptor 와 Executor Channel Interceptor

44246 단어 SpringWebsocket
글 목록
  • ChannelInterceptor:
  • ExecutorChannelInterceptor:
  • 여 기 는 InboundChannel 에 Channel Interceptor 를 설정 하 는 것 을 예 로 들 면
  • ChannelInterceptor:
  • Message 는 스 레 드 탱크 에 전송 되 고 전송 동작 이 실행 되 기 전 (후) 차단 되 어 현재 스 레 드 에서 발생 합 니 다.

  • ExecutorChannelInterceptor:
  • Message 가 스 레 드 풀 에 전송 되면 온라인 스 레 드 풀 에 있 는 새 스 레 드 에서 Message Handler 처리 전 (후) 차단 합 니 다.

  • 여 기 는 인 바 운 드 채널 에 채널 인 터 셉 터 를 설정 하 는 것 을 예 로 들 면
  • 웹 소켓 Message BrokerConfigure 에 차단 기 를 설정 합 니 다:
  • WebSocketMessageBrokerConfigurer:
     @Override
    	
         
          public
          
         
          void
          
         
          configureClientInboundChannel
         (ChannelRegistration registration) {
    		registration.
         
          interceptors
         (new 
         
          ChannelInterceptor
         () {
    			 @Override
    			 
         
          public
          Message<?> 
         
          preSend
         (Message<?> message, MessageChannel channel) {
    			        StompHeaderAccessor accessor = MessageHeaderAccessor.
         
          getAccessor
         (message, StompHeaderAccessor.
         
          class
         );
    			        StompCommand command = accessor.
         
          getCommand
         ();
    			        log.
         
          info
         (command.
         
          toString
         ());
    			        log.
         
          info
         (JSON.
         
          toJSONString
         (message));
    			        if (StompCommand.CONNECT.
         
          equals
         (accessor.
         
          getCommand
         ())) {
    			        	//  :  roomId;  :  userId
    			        	String id = accessor.
         
          getFirstNativeHeader
         ("id");
    			        	accessor.
         
          setUser
         (new 
         
          IdPrincipal
         (id));
    			        }
    			        
         
          return
          message;
    			 }
    
    		});
    	}
    
  • 웹 소켓 Message BrokerConfigure 설정 에서 등 록 된 차단 기 를 읽 습 니 다
  • AbstractMessageBrokerConfiguration:
    	@Bean
    	
         
          public
          AbstractSubscribableChannel 
         
          clientInboundChannel
         () {
    		ExecutorSubscribableChannel channel = new 
         
          ExecutorSubscribableChannel
         (
         
          clientInboundChannelExecutor
         ());
    		ChannelRegistration reg = 
         
          getClientInboundChannelRegistration
         ();
    		if (reg.
         
          hasInterceptors
         ()) {
    			channel.
         
          setInterceptors
         (reg.
         
          getInterceptors
         ());
    		}
    		
         
          return
          channel;
    	}
    
  • 채널 차단 기 를 Executor SubscribableChannel 에 연결 하 는 동시에 차단기 분류 ChannelInterceptorExecutorChannelInterceptor 를 해당 집합 에 추가 합 니 다
  • ExecutorSubscribableChannel:
    	@Override
    	
         
          public
          
         
          void
          
         
          setInterceptors
         (List<
         
          ChannelInterceptor
         > interceptors) {
    		
         
          super
         .
         
          setInterceptors
         (interceptors);
    		
         
          this
         .executorInterceptors.
         
          clear
         ();
    		for (ChannelInterceptor interceptor : interceptors) {
    			if (interceptor 
         
          instanceof
          
         
          ExecutorChannelInterceptor
         ) {
    				
         
          this
         .executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
    			}
    		}
    	}
    
  • 일반적인 Channel Interceptor 는 내부 클래스 AbstractMessageChannel$ChannelInterceptorChain 에 의 해 차단기 체인 으로 구성 되 어 있 으 며, 이 차단기 체인 의 모든 실현 방법 은 일부 AbstractMessage Channel 인 스 턴 스 가 가지 고 있 는 interceptors 집합 을 핵심 으로 하여 차단기 호출 을 한다.
  • 차단기 체인 의 호출 은 AbstractMessageChannel 인 스 턴 스 의 send 방법 에서 발생 합 니 다. sendInternal 방법 이 실 행 된 전후 동기 화 호출 은 현재 스 레 드 에서 발생 합 니 다.
  • AbstractMessageChannel:
    
    
         
          public
          
         
          abstract
          
         
          class
          
         
          AbstractMessageChannel
          
         
          implements
          
         
          MessageChannel
         , InterceptableChannel, BeanNameAware {
    
    	
         
          private
          
         
          final
          List<
         
          ChannelInterceptor
         > interceptors = new 
         
          ArrayList
         <
         
          ChannelInterceptor
         >(5);
    
    	@Override
    	
         
          public
          
         
          final
          
         
          boolean
          
         
          send
         (Message<?> message, 
         
          long
          timeout) {
    		Assert.
         
          notNull
         (message, "
         
          Message
          
         
          must
          
         
          not
          be 
         
          null
         ");
    		ChannelInterceptorChain chain = new 
         
          ChannelInterceptorChain
         ();
    		
         
          boolean
          sent = 
         
          false
         ;
    		try {
    			message = chain.
         
          applyPreSend
         (message, 
         
          this
         );
    			if (message == null) {
    				
         
          return
          
         
          false
         ;
    			}
    			sent = 
         
          sendInternal
         (message, timeout);
    			chain.
         
          applyPostSend
         (message, 
         
          this
         , sent);
    			chain.
         
          triggerAfterSendCompletion
         (message, 
         
          this
         , sent, null);
    			
         
          return
          sent;
    		}
    		
         
          catch
          (
         
          Exception
          ex) {
    			chain.
         
          triggerAfterSendCompletion
         (message, 
         
          this
         , sent, ex);
    			if (ex 
         
          instanceof
          
         
          MessagingException
         ) {
    				
         
          throw
          (MessagingException) ex;
    			}
    			
         
          throw
          new 
         
          MessageDeliveryException
         (message,"
         
          Failed
          to 
         
          send
          
         
          message
          to " + 
         
          this
         , ex);
    		}
    		
         
          catch
          (
         
          Throwable
          err) {
    			MessageDeliveryException ex2 =
    					new 
         
          MessageDeliveryException
         (message, "
         
          Failed
          to 
         
          send
          
         
          message
          to " + 
         
          this
         , err);
    			chain.
         
          triggerAfterSendCompletion
         (message, 
         
          this
         , sent, ex2);
    			
         
          throw
          ex2;
    		}
    	}
    
    	/**
    	 * 
         
          Assists
          
         
          with
          
         
          the
          
         
          invocation
          of 
         
          the
          
         
          configured
          
         
          channel
          
         
          interceptors
         .
    	 */
    	
         
          protected
          
         
          class
          
         
          ChannelInterceptorChain
          {
    		
         
          private
          int sendInterceptorIndex = -1;
    		
         
          private
          int receiveInterceptorIndex = -1;
    		
    		
         
          public
          Message<?> 
         
          applyPreSend
         (Message<?> message, MessageChannel channel) {
    			Message<?> messageToUse = message;
    			for (ChannelInterceptor interceptor : interceptors) {
    				Message<?> resolvedMessage = interceptor.
         
          preSend
         (messageToUse, channel);
    				if (resolvedMessage == null) {
    					String name = interceptor.
         
          getClass
         ().
         
          getSimpleName
         ();
    					if (logger.
         
          isDebugEnabled
         ()) {
    						logger.
         
          debug
         (name + " 
         
          returned
          
         
          null
          
         
          from
          
         
          preSend
         , i.e. 
         
          precluding
          
         
          the
          
         
          send
         .");
    					}
    					
         
          triggerAfterSendCompletion
         (messageToUse, channel, 
         
          false
         , null);
    					
         
          return
          null;
    				}
    				messageToUse = resolvedMessage;
    				
         
          this
         .sendInterceptorIndex++;
    			}
    			
         
          return
          messageToUse;
    		}
    		
         
          public
          
         
          void
          
         
          applyPostSend
         (Message<?> message, MessageChannel channel, 
         
          boolean
          sent) {
    			for (ChannelInterceptor interceptor : interceptors) {
    				interceptor.
         
          postSend
         (message, channel, sent);
    			}
    		}
          
            /**
             *    
             */
    	}
    
    }
    
  • InboundChannel 인 스 턴 스 는 sendInternal 방법 을 통 해 Message 와 Message Handler 를 SendTask 로 밀봉 하여 자신 을 지탱 하 는 Thread PoolTask Executor 스 레 드 탱크 에 보 내 고 새로운 스 레 드 에서 Message 를 비동기 로 처리 합 니 다.
  • 새로운 스 레 드 에서 SendTask 의 run 방법 에서 MessageHandler.handleMessage(Message) 방법 으로 Message 를 처리 하고 handleMessage 전후 호출 executorInterceptors 집합 을 통 해 차단 합 니 다.
  • ExecutorSubscribableChannel :
    
    
         
          public
          
         
          class
          
         
          ExecutorSubscribableChannel
          
         
          extends
          
         
          AbstractSubscribableChannel
          {
    	
         
          private
          
         
          final
          Executor executor;
    	
         
          private
          
         
          final
          List<
         
          ExecutorChannelInterceptor
         > executorInterceptors = new 
         
          ArrayList
         <
         
          ExecutorChannelInterceptor
         >(4);
    	
    	@Override
    	
         
          public
          
         
          boolean
          
         
          sendInternal
         (Message<?> message, 
         
          long
          timeout) {
    		for (MessageHandler handler : 
         
          getSubscribers
         ()) {
    			SendTask sendTask = new 
         
          SendTask
         (message, handler);
    			if (
         
          this
         .executor == null) {
    				sendTask.run();
    			}
    			
         
          else
          {
    				
         
          this
         .executor.
         
          execute
         (sendTask);
    			}
    		}
    		
         
          return
          
         
          true
         ;
    	}
    	/**
    	 * 
         
          Invoke
          a 
         
          MessageHandler
          
         
          with
          
         
          ExecutorChannelInterceptors
         .
    	 */
    	
         
          private
          
         
          class
          
         
          SendTask
          
         
          implements
          
         
          MessageHandlingRunnable
          {
    		@Override
    		
         
          public
          
         
          void
          run() {
    			Message<?> message = 
         
          this
         .inputMessage;
    			try {
    				message = 
         
          applyBeforeHandle
         (message);
    				if (message == null) {
    					
         
          return
         ;
    				}
    				
         
          this
         .messageHandler.
         
          handleMessage
         (message);
    				
         
          triggerAfterMessageHandled
         (message, null);
    			}
    			
         
          catch
          (
         
          Exception
          ex) {
    				
         
          triggerAfterMessageHandled
         (message, ex);
    				if (ex 
         
          instanceof
          
         
          MessagingException
         ) {
    					
         
          throw
          (MessagingException) ex;
    				}
    				String description = "Failed to handle " + message + " to " + 
         
          this
          + " in " + 
         
          this
         .messageHandler;
    				
         
          throw
          new 
         
          MessageDeliveryException
         (message, description, ex);
    			}
    			
         
          catch
          (
         
          Throwable
          err) {
    				String description = "Failed to handle " + message + " to " + 
         
          this
          + " in " + 
         
          this
         .messageHandler;
    				MessageDeliveryException ex2 = new 
         
          MessageDeliveryException
         (message, description, err);
    				
         
          triggerAfterMessageHandled
         (message, ex2);
    				
         
          throw
          ex2;
    			}
    		}
    
    		
         
          private
          Message<?> 
         
          applyBeforeHandle
         (Message<?> message) {
    			Message<?> messageToUse = message;
    			for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
    				messageToUse = interceptor.
         
          beforeHandle
         (messageToUse, ExecutorSubscribableChannel.
         
          this
         , 
         
          this
         .messageHandler);
    				if (messageToUse == null) {
    					String name = interceptor.
         
          getClass
         ().
         
          getSimpleName
         ();
    					if (logger.
         
          isDebugEnabled
         ()) {
    						logger.
         
          debug
         (name + " 
         
          returned
          
         
          null
          
         
          from
          
         
          beforeHandle
         , i.e. 
         
          precluding
          
         
          the
          
         
          send
         .");
    					}
    					
         
          triggerAfterMessageHandled
         (message, null);
    					
         
          return
          null;
    				}
    				
         
          this
         .interceptorIndex++;
    			}
    			
         
          return
          messageToUse;
    		}
    
    		
         
          private
          
         
          void
          
         
          triggerAfterMessageHandled
         (Message<?> message, Exception ex) {
    			for (int i = 
         
          this
         .interceptorIndex; i >= 0; i--) {
    				ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
    				try {
    					interceptor.
         
          afterMessageHandled
         (message, ExecutorSubscribableChannel.
         
          this
         , 
         
          this
         .messageHandler, ex);
    				}
    				
         
          catch
          (
         
          Throwable
          ex2) {
    					logger.
         
          error
         ("
         
          Exception
          
         
          from
          
         
          afterMessageHandled
          in " + interceptor, ex2);
    				}
    			}
    		}
    	}
    
    }
    

    좋은 웹페이지 즐겨찾기