Apache Camel의 Aggregator가 서버가 다운될 때 메시지 손실 방지

17393 단어 apache-camel
개시하다
Aggregator는 여러 개의 메시지를 한 편지에 집중할 수 있습니다.여러 정보를 분할하는 스플릿터와 반대되는 처리.

그림 출처: Enterprise Integration Patterns - Aggregator
Aggregator는 집합 메시지를 메모리에 저장합니다. 프로그램이 종료되면 집합 메시지를 잃어버립니다.
LeveldB를 창고로 사용하면 데이터 분실을 방지할 수 있습니다.
SEDA와 루트에서 흐르는 Exchange가 영구화되지 않았기 때문에 Aggregator를 영구화하려는 경우(?)는 드물다.어쩌면, 우연히 그런 용례를 만났기 때문에 조사해 봤어요.
LeveldB의 준비
우선, "cal-leveldb"의 프로그램 라이브러리를 추가합니다(이하 Maven의 경우).
pom.xml
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-leveldb</artifactId>
            <version>${camel.version}</version>
        </dependency>
그리고 LeveldB의 창고를 정의합니다.
· perrsistentFileName에서 LeveldB에 사용할 디렉토리를 지정합니다.
・repositoryName에서 창고의 이름을 지정합니다.
    <bean id="myRepo"
        class="org.apache.camel.component.leveldb.LevelDBAggregationRepository">
        <property name="persistentFileName" value="data/leveldb.dat" />
        <property name="repositoryName" value="myRepo" />
        <property name="sync" value="true" />
        <property name="useRecovery" value="true" />
        <property name="recoveryInterval" value="5000" />
    </bean>
테스트용 XML DSL 만들기
route는 다음과 같이 제작되었습니다.
• 1초마다 실행 날짜와 시간 문자열이 있는 정보를 루트로 전송합니다.
·aggregationRepository Ref를 통해 LevelDB를 창고로 지정한다.
·aggregate의compuletionSize에서'5'를 지정함으로써 정보는 최대 5개까지 컴파일됩니다.
●compuletionTimeout에서'5000'(5초)을 지정하여 마지막으로 메시지를 받은 후 5초 동안 컴파일을 완성합니다.
● 취합 소식은 "<camelContext xmlns="http://camel.apache.org/schema/spring"> <route id="parentRoute"> <from uri="timer://run?delay=1000&amp;period=1000" /> <setBody> <simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple> </setBody> <log message="#### timer start" /> <aggregate strategyRef="MyAggregationStrategy" aggregationRepositoryRef="myRepo" forceCompletionOnStop="true" completionSize="5"> <correlationExpression> <constant>true</constant> </correlationExpression> <completionTimeout> <simple>5000</simple> </completionTimeout> <to uri="file://output?doneFileName=$simple{file:name}.done&amp;fileExist=Fail" /> <log message="#### aggregate end" /> </aggregate> </route> </camelContext> strategyRef에서 요약 방법(형식)을 지정합니다.
이번에는, 세그먼트(CSV)의 strategy가 다음과 같습니다.
package sample;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class MyAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }

        String oldB = oldExchange.getIn().getBody(String.class);
        String newB = newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(oldB + "," + newB);
        return oldExchange;
    }
}
메시지를 잃어버리지 않도록 실행하기
실행 후 로그 출력은 5개(timer start)씩 다음과 같습니다.
[2018-10-04 20:29:21.635], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:22.636], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:23.637], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:24.638], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:25.638], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:25.656], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### aggregate end
출력된 파일은 다음과 같이 실행 시간을 ","구분자로 요약합니다.
2018-10-04 20:29:21,2018-10-04 20:29:22,2018-10-04 20:29:23,2018-10-04 20:29:24,2018-10-04 20:29:25
정상 작동을 확인한 후 중간에 KILL이 앱을 사용해도 정보를 잃어버리지 않는지 확인한다.
시행 1차는 3차 킬.5건 미만이라 취합이 안 돼 파일도 출력하지 않는다.
[2018-10-04 20:40:49.502], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:40:50.471], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:40:51.472], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start

두 번째 실행 후 두 번째 건(1차와 총 5건)이 합쳐지고 파일이 출력됩니다.
[2018-10-04 20:41:38.047], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:41:39.041], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:41:39.076], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### aggregate end
서류를 확인한 결과 1차(20시 40분)와 2차(20시 41분) 5건이 빠짐없이 출력된 것을 확인했다.
2018-10-04 20:40:49,2018-10-04 20:40:50,2018-10-04 20:40:51,2018-10-04 20:41:38,2018-10-04 20:41:39
중도 KILL 애플리케이션에서도 정보가 손실되지 않음을 확인했습니다.
이런 영속화에 의존하지 않고 원상태로 회복하는 것이 좋을 것 같지만 순조롭지 못한 경우도 있기 때문에 Aggregator가 지속될 수 있는 것도 편리하다.
주의점
Camel의 LeveldB 접근은 배타 처리가 되어 여러 라인이 동시에 LeveldB에 접근할 수 없습니다.따라서 대량의 데이터를 고속으로 처리하는 것은 적합하지 않다.
XML DSL 전체
시험적으로 만들어진 XML DSL 전체는 다음과 같습니다.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
          http://camel.apache.org/schema/spring
          http://camel.apache.org/schema/spring/camel-spring.xsd
          http://www.springframework.org/schema/util
          http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <bean id="myRepo"
        class="org.apache.camel.component.leveldb.LevelDBAggregationRepository">
        <property name="persistentFileName" value="./leveldb_data" />
        <property name="repositoryName" value="myRepo" />
        <property name="sync" value="true" />
        <property name="useRecovery" value="true" />
        <property name="recoveryInterval" value="5000" />
    </bean>
    <bean id="MyAggregationStrategy"
        class="sample.MyAggregationStrategy" />

    <camelContext
        xmlns="http://camel.apache.org/schema/spring">
        <route id="parentRoute">
            <from uri="timer://run?delay=1000&amp;period=1000" />
            <setBody>
                <simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
            </setBody>
            <log message="#### timer start" />
            <aggregate strategyRef="MyAggregationStrategy" aggregationRepositoryRef="myRepo" forceCompletionOnStop="true"
                completionSize="5">
                <correlationExpression>
                    <constant>true</constant>
                </correlationExpression>
                <completionTimeout>
                    <simple>5000</simple>
                </completionTimeout>
                            <to
                uri="file://output?doneFileName=$simple{file:name}.done&amp;fileExist=Fail" />
                <log message="#### aggregate end" />
            </aggregate>
        </route>
    </camelContext>
</beans>
구성 요소의 주요 속성
마지막으로camel-leveldb 구성 요소의 주요 속성을 설명합니다.
속성은 아래 표를 제외하고 자세한 내용은 공식 페이지를 참조하십시오.
  • LevelDB
  • 속성 이름
    설명
    기본값
    타입
    repositoryName
    옵션이 필요합니다.LeveldB의 창고 이름을 지정합니다.공유된 LeveldBFile은 여러 저장소에서 사용할 수 있습니다.
    String
    persistentFileName
    영구적으로 저장되는 파일 이름을 지정합니다.시작할 때 파일이 존재하지 않으면 새 파일을 만듭니다.
    String
    sync
    LeveldBFile에 쓸 때 동기화 여부를 지정합니다.기본값은 비동기식입니다.쓰기와 동기화되므로 모든 쓰기가 완료되기 전에 대기하므로 데이터가 손실되지 않습니다.
    false
    boolean
    useRecovery
    복구 사용 여부를 지정합니다.기본값은 진짜입니다. (복구는 유효합니다.)활성화되면 Camel Aggregator는 Exchange 집계가 실패할 때 자동으로 복구하여 다시 보냅니다.
    true
    boolean
    recoveryInterval
    리스토어가 설정되어 있으면 실패한 Exchange를 검색하여 리스토어하고 다시 실행하는 백그라운드 작업을 수행합니다.기본적으로 5000밀리초 간격으로 실행됩니다.
    5000
    long
    maximumRedeliveries
    Exchange 재개를 위한 최대 시도 횟수를 지정합니다.이 옵션을 사용하면 최대 시도 횟수만 실패하면 Exchange가 패닉 채널로 이동합니다.기본적으로 유효하지 않습니다.이 옵션을 사용하는 경우 deadLetterUri 옵션도 지정합니다.
    int
    deadLetterUri
    Exchange를 다시 시도하여 이동할 사신 채널의 끝점 URI를 지정합니다.
    String
    참고 자료
  • Aggregator(공식 사이트)
  • Camel Persistent Aggregate-(공식 웹 사이트 Example)
  • Camel Component LeveldB(공식 사이트)
  • LevelDB
  • 좋은 웹페이지 즐겨찾기