[GAE/Java] 비동기 API로 Queue에 추가한 Task가 사라지는 문제(미해결)

Google App Engine(GAE)/Java에서 TaskQueue의 비동기 API를 트랜잭션 지정하여 호출했는데 Task의 Add가 사라진다(Add되지 않음)라는 현상이 발생했습니다.

GAE에서는 Datastore의 약한 트랜잭션 기능을 커버하기 위해 무한 재시도하는 TaskQueue를 병용하여 결과 무결성을 담보하는, 같은 것을 잘 합니다만, 이것으로는 결과 무결성을 유지할 수 없습니다( ᷄ὢ · ᷅)

사양인지 아니면 코드에 문제가 있는지도 아직 알 수 없습니다. 식자가 눈을 뗄 수 있을 것을 기대해 기사로 해 둡니다.

코드



그대로 배포할 수 있는 프로젝트를 github에 두고 있습니다.
htps : // 기주 b. 이 m / k에 gh ぉ / 아 syn ctq st

TestServlet.java
AsyncDatastoreService datastore = DatastoreServiceFactory
        .getAsyncDatastoreService();

List<Future<?>> futures = new ArrayList<Future<?>>();
for (int i = 0; i < 10; i++) {
    Transaction tx = null;
    try {
        tx = datastore.beginTransaction().get();

        String keyName = UUID.randomUUID().toString();
        Entity entity = new Entity("Task", keyName);
        entity.setProperty("status", "pending");
        entity.setProperty("createdAt", new Date());
        datastore.put(tx, entity);

        Queue queue = QueueFactory.getDefaultQueue();
        queue.addAsync(
                tx,
                TaskOptions.Builder.withUrl("/task")
                        .param("key", keyName).method(Method.GET));

        futures.add(tx.commitAsync());
    } catch (Throwable t) {
        logger.severe("error: " + t.getMessage());
        t.printStackTrace();
        if (tx != null && tx.isActive()) {
            tx.rollback();
        }
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        }
        throw new IllegalStateException(t);
    }
}

for (Future<?> f : futures) {
    try {
        f.get();
    } catch (InterruptedException | ExecutionException e) {
        throw new IllegalStateException(e);
    }
}

문제의 부분↓

TestServlet.java
        Queue queue = QueueFactory.getDefaultQueue();
        queue.addAsync(
                tx,
                TaskOptions.Builder.withUrl("/task")
                        .param("key", keyName).method(Method.GET));

루프 내에서 각각 Datastore에 "Task"라는 Kind의 엔티티를 1건 저장하고, 또한 TaskQueue에 Task를 1건 저장하고 있습니다.

Datastore, TaskQueue 두 API 모두 비동기 버전을 사용하고 있지만 Future를 처리하지 않고 트랜잭션 commitAsync의 결과 Future를 나중에 함께 동기화합니다.

기대하는 행동



각 루프 내에서 트랜잭션을 지정하고 Datastore에 put, TaskQueue에 Add를하고 있기 때문에, 둘 중 하나가 실패하면 롤백이 걸릴 것으로 기대합니다.

실제



Datastore의 엔티티만 put되고 Task가 Add되지 않는 케이스가 몇번인가(10회 루프중 3~5회) 발생합니다.
오류도 발생하지 않습니다.



Run in Last Minute가 10이 되지 않으면 안 되는데 5가 되고 있습니다. 로그를 보더라도 TQ를 통해 호출되어야 하는 서블릿은 5번만 호출됩니다.

의문



TaskQueue의 비동기 API는 Future#get하여 동기화를 취하지 않으면 결과가 보장되지 않을까요?

문서
htps : // c ぉ d. 오, ぇ. 코 m / 아 펜 기네 / 드 cs / 그럼 ぁ / data s 토레 / 아 syn c #
를 보면 "When you are using a transaction, calling Transaction.commit() blocks on the result of all async calls made since the transaction started before committing it"Datastore의 비동기 API는 동기화하지 않아도 제대로 저장되었습니다. 이 설명은 TaskQueue API에는 적용되지 않습니까?

(일단) 대책



TaskQueue API만 동기판으로 변경했는데, 무사히 모든 태스크가 Add되게 되었습니다.
하지만 그 부분에서 평행 처리가 없어져 버리기 때문에 매우 유감( ・᷄ὢ・᷅ )

덧붙여서 사족이지만 · ·



GAE/Go에서는 goroutine+channel을 사용하여 트랜잭션마다 완전히 병렬 처리할 수 있으므로 위의 문제는 두지 않습니다(`・ω・´)드

아, GAE/Java에서도 Thread 사용할 수 있을까. . 하지만 Thread 자체가 너무 무거워서 별로 매력을 느끼지 않는다. .

좋은 웹페이지 즐겨찾기