Spring Websocket + SockJS + STOMP 실시 간 통신 실현 (5) - Channel Interceptor 와 Executor Channel Interceptor
ExecutorChannelInterceptor:
여 기 는 인 바 운 드 채널 에 채널 인 터 셉 터 를 설정 하 는 것 을 예 로 들 면
WebSocketMessageBrokerConfigurer:
@Override
public
void
configureClientInboundChannel
(ChannelRegistration registration) {
registration.
interceptors
(new
ChannelInterceptor
() {
@Override
public
Message<?>
preSend
(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.
getAccessor
(message, StompHeaderAccessor.
class
);
StompCommand command = accessor.
getCommand
();
log.
info
(command.
toString
());
log.
info
(JSON.
toJSONString
(message));
if (StompCommand.CONNECT.
equals
(accessor.
getCommand
())) {
// : roomId; : userId
String id = accessor.
getFirstNativeHeader
("id");
accessor.
setUser
(new
IdPrincipal
(id));
}
return
message;
}
});
}
AbstractMessageBrokerConfiguration:
@Bean
public
AbstractSubscribableChannel
clientInboundChannel
() {
ExecutorSubscribableChannel channel = new
ExecutorSubscribableChannel
(
clientInboundChannelExecutor
());
ChannelRegistration reg =
getClientInboundChannelRegistration
();
if (reg.
hasInterceptors
()) {
channel.
setInterceptors
(reg.
getInterceptors
());
}
return
channel;
}
ChannelInterceptor
와 ExecutorChannelInterceptor
를 해당 집합 에 추가 합 니 다 ExecutorSubscribableChannel:
@Override
public
void
setInterceptors
(List<
ChannelInterceptor
> interceptors) {
super
.
setInterceptors
(interceptors);
this
.executorInterceptors.
clear
();
for (ChannelInterceptor interceptor : interceptors) {
if (interceptor
instanceof
ExecutorChannelInterceptor
) {
this
.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
}
}
}
AbstractMessageChannel$ChannelInterceptorChain
에 의 해 차단기 체인 으로 구성 되 어 있 으 며, 이 차단기 체인 의 모든 실현 방법 은 일부 AbstractMessage Channel 인 스 턴 스 가 가지 고 있 는 interceptors
집합 을 핵심 으로 하여 차단기 호출 을 한다.AbstractMessageChannel:
public
abstract
class
AbstractMessageChannel
implements
MessageChannel
, InterceptableChannel, BeanNameAware {
private
final
List<
ChannelInterceptor
> interceptors = new
ArrayList
<
ChannelInterceptor
>(5);
@Override
public
final
boolean
send
(Message<?> message,
long
timeout) {
Assert.
notNull
(message, "
Message
must
not
be
null
");
ChannelInterceptorChain chain = new
ChannelInterceptorChain
();
boolean
sent =
false
;
try {
message = chain.
applyPreSend
(message,
this
);
if (message == null) {
return
false
;
}
sent =
sendInternal
(message, timeout);
chain.
applyPostSend
(message,
this
, sent);
chain.
triggerAfterSendCompletion
(message,
this
, sent, null);
return
sent;
}
catch
(
Exception
ex) {
chain.
triggerAfterSendCompletion
(message,
this
, sent, ex);
if (ex
instanceof
MessagingException
) {
throw
(MessagingException) ex;
}
throw
new
MessageDeliveryException
(message,"
Failed
to
send
message
to " +
this
, ex);
}
catch
(
Throwable
err) {
MessageDeliveryException ex2 =
new
MessageDeliveryException
(message, "
Failed
to
send
message
to " +
this
, err);
chain.
triggerAfterSendCompletion
(message,
this
, sent, ex2);
throw
ex2;
}
}
/**
*
Assists
with
the
invocation
of
the
configured
channel
interceptors
.
*/
protected
class
ChannelInterceptorChain
{
private
int sendInterceptorIndex = -1;
private
int receiveInterceptorIndex = -1;
public
Message<?>
applyPreSend
(Message<?> message, MessageChannel channel) {
Message<?> messageToUse = message;
for (ChannelInterceptor interceptor : interceptors) {
Message<?> resolvedMessage = interceptor.
preSend
(messageToUse, channel);
if (resolvedMessage == null) {
String name = interceptor.
getClass
().
getSimpleName
();
if (logger.
isDebugEnabled
()) {
logger.
debug
(name + "
returned
null
from
preSend
, i.e.
precluding
the
send
.");
}
triggerAfterSendCompletion
(messageToUse, channel,
false
, null);
return
null;
}
messageToUse = resolvedMessage;
this
.sendInterceptorIndex++;
}
return
messageToUse;
}
public
void
applyPostSend
(Message<?> message, MessageChannel channel,
boolean
sent) {
for (ChannelInterceptor interceptor : interceptors) {
interceptor.
postSend
(message, channel, sent);
}
}
/**
*
*/
}
}
MessageHandler.handleMessage(Message)
방법 으로 Message 를 처리 하고 handleMessage 전후 호출 executorInterceptors
집합 을 통 해 차단 합 니 다.ExecutorSubscribableChannel :
public
class
ExecutorSubscribableChannel
extends
AbstractSubscribableChannel
{
private
final
Executor executor;
private
final
List<
ExecutorChannelInterceptor
> executorInterceptors = new
ArrayList
<
ExecutorChannelInterceptor
>(4);
@Override
public
boolean
sendInternal
(Message<?> message,
long
timeout) {
for (MessageHandler handler :
getSubscribers
()) {
SendTask sendTask = new
SendTask
(message, handler);
if (
this
.executor == null) {
sendTask.run();
}
else
{
this
.executor.
execute
(sendTask);
}
}
return
true
;
}
/**
*
Invoke
a
MessageHandler
with
ExecutorChannelInterceptors
.
*/
private
class
SendTask
implements
MessageHandlingRunnable
{
@Override
public
void
run() {
Message<?> message =
this
.inputMessage;
try {
message =
applyBeforeHandle
(message);
if (message == null) {
return
;
}
this
.messageHandler.
handleMessage
(message);
triggerAfterMessageHandled
(message, null);
}
catch
(
Exception
ex) {
triggerAfterMessageHandled
(message, ex);
if (ex
instanceof
MessagingException
) {
throw
(MessagingException) ex;
}
String description = "Failed to handle " + message + " to " +
this
+ " in " +
this
.messageHandler;
throw
new
MessageDeliveryException
(message, description, ex);
}
catch
(
Throwable
err) {
String description = "Failed to handle " + message + " to " +
this
+ " in " +
this
.messageHandler;
MessageDeliveryException ex2 = new
MessageDeliveryException
(message, description, err);
triggerAfterMessageHandled
(message, ex2);
throw
ex2;
}
}
private
Message<?>
applyBeforeHandle
(Message<?> message) {
Message<?> messageToUse = message;
for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
messageToUse = interceptor.
beforeHandle
(messageToUse, ExecutorSubscribableChannel.
this
,
this
.messageHandler);
if (messageToUse == null) {
String name = interceptor.
getClass
().
getSimpleName
();
if (logger.
isDebugEnabled
()) {
logger.
debug
(name + "
returned
null
from
beforeHandle
, i.e.
precluding
the
send
.");
}
triggerAfterMessageHandled
(message, null);
return
null;
}
this
.interceptorIndex++;
}
return
messageToUse;
}
private
void
triggerAfterMessageHandled
(Message<?> message, Exception ex) {
for (int i =
this
.interceptorIndex; i >= 0; i--) {
ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
try {
interceptor.
afterMessageHandled
(message, ExecutorSubscribableChannel.
this
,
this
.messageHandler, ex);
}
catch
(
Throwable
ex2) {
logger.
error
("
Exception
from
afterMessageHandled
in " + interceptor, ex2);
}
}
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[MeU] Hashtag 기능 개발➡️ 기존 Tag 테이블에 존재하지 않는 해시태그라면 Tag , tagPostMapping 테이블에 모두 추가 ➡️ 기존에 존재하는 해시태그라면, tagPostMapping 테이블에만 추가 이후에 개발할 태그 기반 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.