RxJS 전역 속도 제한
13967 단어 rxjstypescriptfetch
RxJS 전역 속도 제한
직장에서 우리는 분당 200개의 요청 속도 제한이 있는 타사 API에 대한 통합을 구축하고 있습니다. API는 페이지가 매겨졌고 응답에 모든 데이터가 포함되지 않았으므로 각 항목에 대한 요청이 필요했습니다.
다음 코드는 rxjs를 사용하여 이 문제를 해결할 수 있는 방법을 보여줍니다.
시작하자!
import {
BehaviorSubject,
filter,
map,
mergeMap,
MonoTypeOperatorFunction,
Observable,
take,
timer,
} from 'rxjs'
// global variables
let tokens = 200
let slidingWindowTime = 60_000
let tokenChangedSubject = new BehaviorSubject(tokens)
/**
* Rate limit observable to x number of requests withing the slidingWindowTime
*
* @param parallel number of requests to run in parallel
* @param slidingWindowTime time in milliseconds for how many requests can be run within
* @returns rate limited observable
*/
export function globalRateLimit<T>(setOptions?: {
parallel: number
slidingWindowTime: number
}): MonoTypeOperatorFunction<T> {
// initialize at first or at options reset
if (setOptions) {
tokens = parallel ?? 200
slidingWindowTime = setOptions.slidingWindowTime ?? 60_000
tokenChangedSubject.next(tokens)
}
const consumeToken = () => tokenChangedSubject.next(--tokens)
const renewToken = () => tokenChangedSubject.next(++tokens)
const availableTokens = tokenChangedSubject.pipe(filter(() => tokens > 0))
return mergeMap<T, Observable<T>>((value: T) =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(slidingWindowTime).subscribe(renewToken)
return value
}),
),
)
}
사용 방법
const source$ = of(1, 2, 3, 4, 5, 6)
source$
.pipe(globalRateLimit({ parallel: 2, slidingWindowTime: 1000 }))
.subscribe((v) => console.log(v))
// output
// 1
// 2
// wait for 1000ms
// 3
// 4
// wait for 1000ms
// 5
// 6
// complete
글로벌 속도 제한으로 사용하면 어떤 이점이 있습니까?
둘 다 동일한 제한을 수신해야 하는 관찰 가능한 여러 스트림이 있는 경우 첫 번째 호출에서 제한을 구성하고 후속 호출에서 파이프만 사용할 수 있기 때문입니다.
interface Page {
page: number
totalPages: number
items: number[]
}
const pageRequest = (page: number): Observable<Page> => {
return of({
page,
totalPages: 10,
items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
}).pipe(
// the page requests should be rate limited
globalRateLimit({ parallel: 2, slidingWindowTime: 1000 }),
)
}
// a stream of page responses, where we comply to the rate limit
const pages$ = pageRequest(1).pipe(
expand((response) =>
response.page < response.totalPages ? pageRequest(response.page + 1) : EMPTY,
),
)
// now we want a stream of all items in the pages
// so we flatten the page.items into a stream of items
const items$ = pages$.pipe(concatMap((page) => from(page.items)))
// now we want to call the api and get each item and still keep to the rate limit
interface Item {
id: number
name: string
}
const itemRequest = (item: number): Observable<Item> => {
// fake api call that simulates taking 50ms to complete
return of({
id: item,
name: `item ${item}`,
}).pipe(delay(50))
}
const items$ = items$.pipe(
globalRateLimit(),
concatMap((item) => itemRequest(item)),
)
// will get all pages, then call itemRequest for each item in page.items
// and all of this will be rate limited to 200 requests per minute
items$.subscribe((item) => console.log(item))
결론
rxjs로 해결할 수 있는 복잡한 상황이 많이 있습니다. 이것이 같은 상황에 있는 누군가를 도울 수 있기를 바랍니다.
즐거운 코딩하세요!
Reference
이 문제에 관하여(RxJS 전역 속도 제한), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/leon/rxjs-global-rate-limit-4g2j텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)