[Spring Integration] 서버 기능을 통해 생성 된 소켓을 사용하여 클라이언트 전송
13080 단어 spring-bootspring-integrationspring
하고 싶은 일
내가 참가하고 있는 이슈는 아래의 그림이라면 「Gateway」로 하고 있는 부분으로, 클라이언트로부터 HTTP 경유로 JSON 메세지를 받고, 고정 항목 길이 메세지로 변환하고 나서 TCP 경유로 서버에 송신할 필요가 있습니다 .
Spring Integration의 실현 가능성은?
현재 참가하고 있는 안건에서는 Spring Integration을 사용하고 있습니다만, 대략 조사한 느낌이라고・・・표준 기능의 조합만으로는 무리인 것 같았습니다.
표준 기능만으로는 무리인 것 같았습니다만, 이하와 같이 송신 컴퍼넌트를 호출하기 전에 「있는 일」을 실시하는 컴퍼넌트를 끼워 넣는 것으로 실현할 수 있을 것 같습니다.
"있는 것"= "이용하는 연결의 연결 ID를 메시지 헤더에 설정하는 것"입니다.
실제로 시도해보기
Bean 설정과 HeaderEnricher의 구현은 다음과 같은 느낌.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean // サーバからの接続を受けるためのコネクションファクトリ
public AbstractServerConnectionFactory serverConnectionFactory() {
return Tcp.nioServer(9999).get();
}
@Bean // サーバからメッセージが送られてきた時の処理(※今回はとりあえずログ出力してメッセージを破棄するフローを定義しておく)
public IntegrationFlow receiver(AbstractConnectionFactory serverConnectionFactory) {
return IntegrationFlows.from(Tcp.inboundAdapter(serverConnectionFactory))
.<byte[], String>transform(String::new)
.log(LoggingHandler.Level.INFO, "client-res")
.nullChannel();
}
@Bean // サーバへメッセージを送信する処理
public IntegrationFlow sender(AbstractConnectionFactory serverConnectionFactory) {
return IntegrationFlows.from(MessageChannels.direct("senderChannel"))
// 【今回のポイント】 送信処理で利用するコネクションのIDをメッセージヘッダへ設定しておく
// TODO 利用するコネクションの決定方法(⇨実アプリだとラウンドロビン方式で使うコネクションを振り分ける必要あり)
.enrich(x -> x.headerFunction(IpHeaders.CONNECTION_ID,
m -> serverConnectionFactory.getOpenConnectionIds().get(0)))
.log(LoggingHandler.Level.INFO, "client-req")
.<String, byte[]>transform(String::getBytes)
.handle(Tcp.outboundAdapter(serverConnectionFactory))
.get();
}
@Bean // テストケース側でsenderにメッセージを送信する時に使う用
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
}
테스트 케이스 클래스를 만들고 하고 싶은 일이 있는지 확인해 봅니다.
@SpringBootTest
class DemoApplicationTests {
@Autowired
private MessagingTemplate messagingTemplate;
@Test
void sending() throws IOException, InterruptedException {
// サーバからGatewayへ接続してソケットを開く
try (Socket socket = new Socket("localhost", 9999)) {
socket.setSoTimeout(2000);
TimeUnit.SECONDS.sleep(1);
// サーバへメッセージを送信
// 本来であればクライアントシステムから送信されたものを固定項目長のメッセージに変換してから送る
messagingTemplate.send("senderChannel", MessageBuilder.withPayload("abc").build());
messagingTemplate.send("senderChannel", MessageBuilder.withPayload("def").build());
// サーバでメッセージを受信
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
{
String message = reader.readLine();
System.out.println("server-req : " + message);
Assertions.assertThat(message).isEqualTo("abc");
}
{
String message = reader.readLine();
System.out.println("server-req : " + message);
Assertions.assertThat(message).isEqualTo("def");
}
}
}
}
}
할 수 있었다! 같은
검증 버전
현재 참가하고 있는 안건에서는 Spring Integration을 사용하고 있습니다만, 대략 조사한 느낌이라고・・・표준 기능의 조합만으로는 무리인 것 같았습니다.
표준 기능만으로는 무리인 것 같았습니다만, 이하와 같이 송신 컴퍼넌트를 호출하기 전에 「있는 일」을 실시하는 컴퍼넌트를 끼워 넣는 것으로 실현할 수 있을 것 같습니다.
"있는 것"= "이용하는 연결의 연결 ID를 메시지 헤더에 설정하는 것"입니다.
실제로 시도해보기
Bean 설정과 HeaderEnricher의 구현은 다음과 같은 느낌.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean // サーバからの接続を受けるためのコネクションファクトリ
public AbstractServerConnectionFactory serverConnectionFactory() {
return Tcp.nioServer(9999).get();
}
@Bean // サーバからメッセージが送られてきた時の処理(※今回はとりあえずログ出力してメッセージを破棄するフローを定義しておく)
public IntegrationFlow receiver(AbstractConnectionFactory serverConnectionFactory) {
return IntegrationFlows.from(Tcp.inboundAdapter(serverConnectionFactory))
.<byte[], String>transform(String::new)
.log(LoggingHandler.Level.INFO, "client-res")
.nullChannel();
}
@Bean // サーバへメッセージを送信する処理
public IntegrationFlow sender(AbstractConnectionFactory serverConnectionFactory) {
return IntegrationFlows.from(MessageChannels.direct("senderChannel"))
// 【今回のポイント】 送信処理で利用するコネクションのIDをメッセージヘッダへ設定しておく
// TODO 利用するコネクションの決定方法(⇨実アプリだとラウンドロビン方式で使うコネクションを振り分ける必要あり)
.enrich(x -> x.headerFunction(IpHeaders.CONNECTION_ID,
m -> serverConnectionFactory.getOpenConnectionIds().get(0)))
.log(LoggingHandler.Level.INFO, "client-req")
.<String, byte[]>transform(String::getBytes)
.handle(Tcp.outboundAdapter(serverConnectionFactory))
.get();
}
@Bean // テストケース側でsenderにメッセージを送信する時に使う用
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
}
테스트 케이스 클래스를 만들고 하고 싶은 일이 있는지 확인해 봅니다.
@SpringBootTest
class DemoApplicationTests {
@Autowired
private MessagingTemplate messagingTemplate;
@Test
void sending() throws IOException, InterruptedException {
// サーバからGatewayへ接続してソケットを開く
try (Socket socket = new Socket("localhost", 9999)) {
socket.setSoTimeout(2000);
TimeUnit.SECONDS.sleep(1);
// サーバへメッセージを送信
// 本来であればクライアントシステムから送信されたものを固定項目長のメッセージに変換してから送る
messagingTemplate.send("senderChannel", MessageBuilder.withPayload("abc").build());
messagingTemplate.send("senderChannel", MessageBuilder.withPayload("def").build());
// サーバでメッセージを受信
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
{
String message = reader.readLine();
System.out.println("server-req : " + message);
Assertions.assertThat(message).isEqualTo("abc");
}
{
String message = reader.readLine();
System.out.println("server-req : " + message);
Assertions.assertThat(message).isEqualTo("def");
}
}
}
}
}
할 수 있었다! 같은
검증 버전
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean // サーバからの接続を受けるためのコネクションファクトリ
public AbstractServerConnectionFactory serverConnectionFactory() {
return Tcp.nioServer(9999).get();
}
@Bean // サーバからメッセージが送られてきた時の処理(※今回はとりあえずログ出力してメッセージを破棄するフローを定義しておく)
public IntegrationFlow receiver(AbstractConnectionFactory serverConnectionFactory) {
return IntegrationFlows.from(Tcp.inboundAdapter(serverConnectionFactory))
.<byte[], String>transform(String::new)
.log(LoggingHandler.Level.INFO, "client-res")
.nullChannel();
}
@Bean // サーバへメッセージを送信する処理
public IntegrationFlow sender(AbstractConnectionFactory serverConnectionFactory) {
return IntegrationFlows.from(MessageChannels.direct("senderChannel"))
// 【今回のポイント】 送信処理で利用するコネクションのIDをメッセージヘッダへ設定しておく
// TODO 利用するコネクションの決定方法(⇨実アプリだとラウンドロビン方式で使うコネクションを振り分ける必要あり)
.enrich(x -> x.headerFunction(IpHeaders.CONNECTION_ID,
m -> serverConnectionFactory.getOpenConnectionIds().get(0)))
.log(LoggingHandler.Level.INFO, "client-req")
.<String, byte[]>transform(String::getBytes)
.handle(Tcp.outboundAdapter(serverConnectionFactory))
.get();
}
@Bean // テストケース側でsenderにメッセージを送信する時に使う用
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
}
@SpringBootTest
class DemoApplicationTests {
@Autowired
private MessagingTemplate messagingTemplate;
@Test
void sending() throws IOException, InterruptedException {
// サーバからGatewayへ接続してソケットを開く
try (Socket socket = new Socket("localhost", 9999)) {
socket.setSoTimeout(2000);
TimeUnit.SECONDS.sleep(1);
// サーバへメッセージを送信
// 本来であればクライアントシステムから送信されたものを固定項目長のメッセージに変換してから送る
messagingTemplate.send("senderChannel", MessageBuilder.withPayload("abc").build());
messagingTemplate.send("senderChannel", MessageBuilder.withPayload("def").build());
// サーバでメッセージを受信
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
{
String message = reader.readLine();
System.out.println("server-req : " + message);
Assertions.assertThat(message).isEqualTo("abc");
}
{
String message = reader.readLine();
System.out.println("server-req : " + message);
Assertions.assertThat(message).isEqualTo("def");
}
}
}
}
}
요약
Spring Integration은 다음과 같이 Gwateway에서 연결하여 소켓을 여는 패턴을 지원합니다.
이번과 같은 케이스는 특수한 접속 사양? 일지도 모르지만, 조금 궁리하는 것으로 실현할 수있을 것 같아서 좋았습니다.
참고 페이지
Reference
이 문제에 관하여([Spring Integration] 서버 기능을 통해 생성 된 소켓을 사용하여 클라이언트 전송), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/kazuki43zoo/items/faf9837b8d83a4787b8d텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)