스트리밍 API

18363 단어 streamingapijava
스트리밍 API는 클라이언트에 보낼 많은 양의 데이터가 있을 때 필요한 것입니다. 제가 생각할 수 있는 비유는 이것입니다. 제 정원에 물을 주고 싶다면 두 가지 옵션이 있습니다. 양동이에 물을 가져온 다음 정원에 가서 하나씩 물을 심습니다. 이것은 정원이 클라이언트 앱이고 내 버킷을 채우기 위해 서버(수원)에서 기다린 다음 채워진 데이터 세트를 클라이언트에 반환해야 하는 나머지 API와 유사합니다. 스트리밍 API를 사용하면 수원에서 정원으로 파이프를 연결하여 실시간으로 물을 얻을 수 있고 완전한 데이터가 검색될 때까지 기다릴 필요가 없습니다. 또한 메모리에 대용량 데이터를 저장할 필요가 없기 때문에 서버 메모리를 많이 절약할 수 있습니다.

이 예에서는 데이터베이스에서 데이터를 가져오려고 합니다. 데이터 볼륨이 크기 때문에 DB에 대한 후속 호출에서 데이터를 가져올 것입니다. DB를 여러 번 호출해야 하기 때문에 이 시나리오에서는 절충점이 있습니다.

나머지 API와 마찬가지로 스트리밍 컨트롤러를 만들 수 있습니다.

//Controller.java

@Autowired
private TaskExecutor taskExecutor;

//input query: SELECT * FROM TABLE1 WHERE ID_COLUMN > $INDEX LIMIT $BACTHSIZE
@PostMapping("/stream/query")
public ResponseBodyEmitter executeMdx(@RequestBody Input input) {

    final ResponseBodyEmitter emitter = new ResponseBodyEmitter();

    taskExecutor.execute(() -> {
        try {

            final String batchSize = "10";

            for (int i = 0; i < 10; i++) {
                input.setInputQuery(input.getInputQuery().replace("$INDEX", String.valueOf(i + 1))
                        .replace("$BATCHSIZE", batchSize));

                Connector c = _applicationContext.getBean(Connector.class, input);

                String[][] result = c.executeQuery(input);
                if (result == null)
                    break;

                for (String[] x : result) {
                    emitter.send(String.join(",", x));
                }
            }
            emitter.complete();
        } catch (Exception e) {
            emitter.completeWithError(e);
            e.printStackTrace();
        }
        emitter.complete();

    });

        return emitter;
}


우리의 사용 사례 클라이언트 애플리케이션은 열 쉼표로 구분된 한 번에 하나의 행을 요청합니다. 이것은 매우 조잡한 방법입니다. 이미 터를 사용하여 json의 바이너리도 보낼 수 있습니다. 또한 배치 크기 계산은 요구 사항에 따라 동적으로 조작될 수 있습니다.

Taskexecutor 빈은 다음과 같이 구성됩니다.

//ThreadConfig.java
@Configuration
public class ThreadConfig {
    @Bean
    public TaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);        
        executor.setThreadNamePrefix("task_executor_thread");
        executor.initialize();
        return executor;
    }
}


사용하는 빈이 스레드 준비 상태인지 확인하십시오. 이 경우 커넥터가 Runnable을 구현하므로 Task Executor를 통해 액세스할 수 있습니다.

//Connector.java
@Service
@Scope("prototype")
public class Connector implements Runnable {

    @Autowired
    private Logger log;

    @Autowired
    private Grid grid;

    private Input _input;

    public Connector(Input input) {
        _input = input;
    }

    public String[][] executeQuery(Input input) {
        try {
            return grid.executeQuery(input);

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }

        return null;
    }

    @Override
    public void run() {        
        executeQuery(_input);
    }
}


스레드에 값을 전달하기 위해 생성자 인수를 사용합니다.

이 샘플은 스트리밍 API를 달성하는 데 필요한 사항을 최소한 안내할 수 있어야 합니다. 모든 사용 사례에 올바른 방법이 아닐 수도 있습니다.

이제 서버 측 스트리밍 API를 만들었으므로 UI에서 이러한 방식으로 사용해야 합니다.

async function postData(url = '', data = {}) {

    var request = {
        method: 'POST',
        mode: 'cors',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(data)
    }

    await fetch('http://localhost:8080/stream/query', request)
        .then(response => {
            const reader = response.body.getReader()
            const utf8Decoder = new TextDecoder("utf-8");
            const stream = new ReadableStream({
                start(controller) {
                    function push() {
                        reader.read().then(({
                            done,
                            value
                        }) => {
                            if (done) {
                                controller.close();
                                return;
                            }

                            controller.enqueue(value);
                            let listItem = document.createElement('li');
                            listItem.textContent = utf8Decoder.decode(value);
                            console.log(listItem.textContent);
                            document.body.appendChild(listItem);
                            push();
                        });
                    };

                    push();
                }
            });

            return new Response(stream, {
                headers: {
                    "Content-Type": "text/html"
                }
            });
        });
}

postData('http://localhost:8080/stream/query', {    
    "inputQuery": "SELECT {[AUGW120]:[AUGW420],[AUG20],[Q3FY20]} ON COLUMNS, NON EMPTY CrossJoin ({[Fulfillment Region].children}, CrossJoin ( SUBSET({Descendants ([Product_Org],3)},$INDEX,$BATCHSIZE), {[CSR_Unit_Sales],[UNIT_Ships]}))ON ROWS FROM [LIBASOD2.NEW_DM] WHERE ([Version].[&CVersion],[Site].[Site],[Customer].[Customer], [Geo].[Geo],[Channel].[Channel],[Config].[Config])"
}).then(data => {
    console.log(data);
});


  • 사진 작성자 Joshua Sortino
  • 원래 게시 위치 bitsmonkey

  • 참고문헌
  • baeldung
  • howtodoinjava
  • 좋은 웹페이지 즐겨찾기