스트리밍 API
이 예에서는 데이터베이스에서 데이터를 가져오려고 합니다. 데이터 볼륨이 크기 때문에 DB에 대한 후속 호출에서 데이터를 가져올 것입니다. DB를 여러 번 호출해야 하기 때문에 이 시나리오에서는 절충점이 있습니다.
나머지 API와 마찬가지로 스트리밍 컨트롤러를 만들 수 있습니다.
//Controller.java
@Autowired
private TaskExecutor taskExecutor;
//input query: SELECT * FROM TABLE1 WHERE ID_COLUMN > $INDEX LIMIT $BACTHSIZE
@PostMapping("/stream/query")
public ResponseBodyEmitter executeMdx(@RequestBody Input input) {
final ResponseBodyEmitter emitter = new ResponseBodyEmitter();
taskExecutor.execute(() -> {
try {
final String batchSize = "10";
for (int i = 0; i < 10; i++) {
input.setInputQuery(input.getInputQuery().replace("$INDEX", String.valueOf(i + 1))
.replace("$BATCHSIZE", batchSize));
Connector c = _applicationContext.getBean(Connector.class, input);
String[][] result = c.executeQuery(input);
if (result == null)
break;
for (String[] x : result) {
emitter.send(String.join(",", x));
}
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
e.printStackTrace();
}
emitter.complete();
});
return emitter;
}
우리의 사용 사례 클라이언트 애플리케이션은 열 쉼표로 구분된 한 번에 하나의 행을 요청합니다. 이것은 매우 조잡한 방법입니다. 이미 터를 사용하여 json의 바이너리도 보낼 수 있습니다. 또한 배치 크기 계산은 요구 사항에 따라 동적으로 조작될 수 있습니다.
Taskexecutor 빈은 다음과 같이 구성됩니다.
//ThreadConfig.java
@Configuration
public class ThreadConfig {
@Bean
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("task_executor_thread");
executor.initialize();
return executor;
}
}
사용하는 빈이 스레드 준비 상태인지 확인하십시오. 이 경우 커넥터가 Runnable을 구현하므로 Task Executor를 통해 액세스할 수 있습니다.
//Connector.java
@Service
@Scope("prototype")
public class Connector implements Runnable {
@Autowired
private Logger log;
@Autowired
private Grid grid;
private Input _input;
public Connector(Input input) {
_input = input;
}
public String[][] executeQuery(Input input) {
try {
return grid.executeQuery(input);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
@Override
public void run() {
executeQuery(_input);
}
}
스레드에 값을 전달하기 위해 생성자 인수를 사용합니다.
이 샘플은 스트리밍 API를 달성하는 데 필요한 사항을 최소한 안내할 수 있어야 합니다. 모든 사용 사례에 올바른 방법이 아닐 수도 있습니다.
이제 서버 측 스트리밍 API를 만들었으므로 UI에서 이러한 방식으로 사용해야 합니다.
async function postData(url = '', data = {}) {
var request = {
method: 'POST',
mode: 'cors',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
}
await fetch('http://localhost:8080/stream/query', request)
.then(response => {
const reader = response.body.getReader()
const utf8Decoder = new TextDecoder("utf-8");
const stream = new ReadableStream({
start(controller) {
function push() {
reader.read().then(({
done,
value
}) => {
if (done) {
controller.close();
return;
}
controller.enqueue(value);
let listItem = document.createElement('li');
listItem.textContent = utf8Decoder.decode(value);
console.log(listItem.textContent);
document.body.appendChild(listItem);
push();
});
};
push();
}
});
return new Response(stream, {
headers: {
"Content-Type": "text/html"
}
});
});
}
postData('http://localhost:8080/stream/query', {
"inputQuery": "SELECT {[AUGW120]:[AUGW420],[AUG20],[Q3FY20]} ON COLUMNS, NON EMPTY CrossJoin ({[Fulfillment Region].children}, CrossJoin ( SUBSET({Descendants ([Product_Org],3)},$INDEX,$BATCHSIZE), {[CSR_Unit_Sales],[UNIT_Ships]}))ON ROWS FROM [LIBASOD2.NEW_DM] WHERE ([Version].[&CVersion],[Site].[Site],[Customer].[Customer], [Geo].[Geo],[Channel].[Channel],[Config].[Config])"
}).then(data => {
console.log(data);
});
참고문헌
Reference
이 문제에 관하여(스트리밍 API), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/0xarjunshetty/streaming-api-a38텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)