RxJS 전역 속도 제한

13967 단어 rxjstypescriptfetch

RxJS 전역 속도 제한



직장에서 우리는 분당 200개의 요청 속도 제한이 있는 타사 API에 대한 통합을 구축하고 있습니다. API는 페이지가 매겨졌고 응답에 모든 데이터가 포함되지 않았으므로 각 항목에 대한 요청이 필요했습니다.

다음 코드는 rxjs를 사용하여 이 문제를 해결할 수 있는 방법을 보여줍니다.

시작하자!

import {
  BehaviorSubject,
  filter,
  map,
  mergeMap,
  MonoTypeOperatorFunction,
  Observable,
  take,
  timer,
} from 'rxjs'

// global variables
let tokens = 200
let slidingWindowTime = 60_000
let tokenChangedSubject = new BehaviorSubject(tokens)

/**
 * Rate limit observable to x number of requests withing the slidingWindowTime
 *
 * @param parallel number of requests to run in parallel
 * @param slidingWindowTime time in milliseconds for how many requests can be run within
 * @returns rate limited observable
 */
export function globalRateLimit<T>(setOptions?: {
  parallel: number
  slidingWindowTime: number
}): MonoTypeOperatorFunction<T> {
  // initialize at first or at options reset
  if (setOptions) {
    tokens = parallel ?? 200
    slidingWindowTime = setOptions.slidingWindowTime ?? 60_000
    tokenChangedSubject.next(tokens)
  }

  const consumeToken = () => tokenChangedSubject.next(--tokens)
  const renewToken = () => tokenChangedSubject.next(++tokens)
  const availableTokens = tokenChangedSubject.pipe(filter(() => tokens > 0))

  return mergeMap<T, Observable<T>>((value: T) =>
    availableTokens.pipe(
      take(1),
      map(() => {
        consumeToken()
        timer(slidingWindowTime).subscribe(renewToken)
        return value
      }),
    ),
  )
}



사용 방법




const source$ = of(1, 2, 3, 4, 5, 6)
source$
  .pipe(globalRateLimit({ parallel: 2, slidingWindowTime: 1000 }))
  .subscribe((v) => console.log(v))

// output
// 1
// 2
// wait for 1000ms
// 3
// 4
// wait for 1000ms
// 5
// 6
// complete



글로벌 속도 제한으로 사용하면 어떤 이점이 있습니까?



둘 다 동일한 제한을 수신해야 하는 관찰 가능한 여러 스트림이 있는 경우 첫 번째 호출에서 제한을 구성하고 후속 호출에서 파이프만 사용할 수 있기 때문입니다.

interface Page {
  page: number
  totalPages: number
  items: number[]
}
const pageRequest = (page: number): Observable<Page> => {
  return of({
    page,
    totalPages: 10,
    items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
  }).pipe(
    // the page requests should be rate limited
    globalRateLimit({ parallel: 2, slidingWindowTime: 1000 }),
  )
}

// a stream of page responses, where we comply to the rate limit
const pages$ = pageRequest(1).pipe(
  expand((response) =>
    response.page < response.totalPages ? pageRequest(response.page + 1) : EMPTY,
  ),
)

// now we want a stream of all items in the pages
// so we flatten the page.items into a stream of items
const items$ = pages$.pipe(concatMap((page) => from(page.items)))

// now we want to call the api and get each item and still keep to the rate limit
interface Item {
  id: number
  name: string
}
const itemRequest = (item: number): Observable<Item> => {
  // fake api call that simulates taking 50ms to complete
  return of({
    id: item,
    name: `item ${item}`,
  }).pipe(delay(50))
}

const items$ = items$.pipe(
  globalRateLimit(),
  concatMap((item) => itemRequest(item)),
)

// will get all pages, then call itemRequest for each item in page.items
// and all of this will be rate limited to 200 requests per minute
items$.subscribe((item) => console.log(item))



결론



rxjs로 해결할 수 있는 복잡한 상황이 많이 있습니다. 이것이 같은 상황에 있는 누군가를 도울 수 있기를 바랍니다.

즐거운 코딩하세요!

좋은 웹페이지 즐겨찾기