C++라인 보안 을 위 한 주파수 제한 기(추천)

오래전 파 이 썬 의 deque 용 기 를 사용 하 는 것 을 배 울 때 나 는 글 python 3 deque 양 방향 대기 열 생 성 및 사용 방법 분석 을 썼 다.최근 에는 온라인 서비스의 성능 을 측정 해 야 하고 항상 QA 쪽 에서 줄 을 서 는 것 을 원 하지 않 기 때문에 스스로 테스트 용 클 라 이언 트 를 써 야 한다.그 중 하 나 는 QPS 규 제 를 실현 하 는 것 이다.
그래서 C++에서 스 레 드 안전 을 실현 하려 는 주파수 제한 기 를 피 할 수 없 었 다.
디자인 아이디어
주파수 제한 이란 한 시간 대(inteval)에서 조작 횟수(limit)를 제한 하 는 것 이다.이것 은 또 두 가지 강약 이 다른 표현 을 이 끌 어 낼 수 있다.
  • 강 표현:임의의 길이 가 설 정 된 시간 대(interval)의 미끄럼 창 에서 주파수 제한 기 가 실행 하 는 작업 횟수(count)는 제한 횟수(limit)보다 높 지 않 습 니 다.
  • 약 한 표현:한 그룹의 길이 가 설 정 된 시간 대(interval)와 밀접 하 게 연 결 된 고정 창 에서 모든 창 에서 주파수 제한 기 가 실행 하 는 작업 횟수(count)는 제한 횟수(limit)보다 높 지 않 습 니 다.
  • 강 한 표현 은'미끄럼 창'을 통 해 빈 도 를 제한 할 뿐만 아니 라 시간 적 인 균일 성 을 구 하 는 것 을 발견 하기 어렵 지 않다.전 작 의 주파수 제한 기 는 실제로 이곳 의 강 한 표현 에 대응 했다.그러나 실제 공정 에서 우 리 는 약 한 표현 을 실현 하 는 주파수 제한 기 만 있 으 면 충분히 사용 할 수 있다.
    약 한 표현 에 대해 실현 할 때 주요 한 사 고 는 다음 과 같다.
    조작 계수(count)가 제한(limit)보다 적 을 때 바로 놓 기;
    단일 스 레 드 구현
    스 레 드 안전 을 고려 하지 않 을 때 이런 실현 을 어렵 지 않다.
    
    struct ms_clock {
     using rep = std::chrono::milliseconds::rep;
     using period = std::chrono::milliseconds::period;
     using duration = std::chrono::duration<rep, period>;
     using time_point = std::chrono::time_point<ms_clock, duration>;
    
     static
     time_point now() noexcept {
     return time_point(std::chrono::duration_cast<duration>(
       std::chrono::steady_clock::now().time_since_epoch()));
     }
    };
    } // namespace __details
    
    class RateLimiter {
     public:
     using clock = __details::ms_clock; // 1.
    
     private:
     const uint64_t limit_;
     const clock::duration interval_;
     const clock::rep interval_count_;
    
     mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a.
     mutable clock::rep timestamp_{0};     // 2.b.
    
     public:
     constexpr RateLimiter(uint64_t limit, clock::duration interval) :
     limit_(limit), interval_(interval), interval_count_(interval_.count()) {}
    
     RateLimiter(const RateLimiter&) = delete;  // 3.a.
     RateLimiter& operator=(const RateLimiter&) = delete; // 3.b.
     RateLimiter(RateLimiter&&) = delete;   // 3.c.
     RateLimiter& operator=(RateLimiter&&) = delete; // 3.d.
    
     bool operator()() const;
     double qps() const {
     return 1000.0 * this->limit_ / this->interval_count_;
     }
    };
    
    bool RateLimiter::operator()() const {
     auto orig_count = this->count_++;
     if (orig_count < this->limit_) { // 4.
     return true;
     } else {
     auto ts = this->timestamp_;
     auto now = clock::now().time_since_epoch().count();
     if (now - ts < this->interval_count_) { // 5.
     return false;
     }
     this->timestamp_ = now;
     this->count_ = 1;
     return true;
     }
    }
    여기
    (1)주파수 제한 기 사용 단 위 를 밀리초 로 표시 하 는 시계.실제 사용 할 때 도 필요 에 따라 미묘 하거나 심지어 나 초 로 바 꿀 수 있다.
    (2)mutable 을 사용 하여 내부 상 태 를 count_timestamp_ 으로 수식 한다.limit_interval_ 이 같은 주파수 제한 기 두 개가 논리 적 으로 등가 이지 만 내부 상 태 는 반드시 같 지 않 기 때문이다.따라서 const 구성원 함수 가 내부 상 태 를 수정 할 수 있 도록 내부 상태 데이터 구성원 에 게 mutable 수식 을 추가 해 야 합 니 다.
    (2.a)에서 count_ 을 유형 으로 표시 할 수 있 는 최대 값 으로 설정 한 것 은
    (4)판단 이 실 패 했 고 timestamp_ 을 초기 화 했다.
    (2b)에서 timestamp_0 으로 설정 한 것 은 같은 이유 로(5)의 판단 을 실패 하 게 한 것 이다.
    (2.a)와(2.b)처 는 교묘 한 초기 값 설 계 를 통 해 초기 화 상태 와 후속 정상 적 인 작업 상태의 논 리 를 통일 시 켰 고 추 한 판단 이 필요 없다.
    (3)대상 의 복사 와 이동 을 금지한다.이것 은 한 주파수 제한 기 가 한 조 의 조작 을 연결 해 야 하기 때문에 두 조 또는 더 많은 그룹 이 공유 해 서 는 안 되 거나 중간 에 효력 을 잃 거나(이동 하 는 경우).
    이렇게 되면 함수 호출 연산 자 는 상술 한 논 리 를 충 실 히 실현 하 였 다.
    다 중 스 레 드 개조
    일차 개조
    다 중 스 레 드 가 RateLimiter::operator() 을 동시에 호출 할 때 count_timestamp_ 에서 경쟁 이 생 길 수 있 음 을 알 수 있다.우 리 는 이 문 제 를 해결 하 는 두 가지 방법 이 있다.그렇지 않 으 면 자 물 쇠 를 추가 하거나 count_timestamp_ 을 원자 변수 로 설정 한 후에 원자 조작 으로 문 제 를 해결 해 야 한다.그래서 함수 호출 연산 자 에 대해 우 리 는 다음 과 같은 첫 번 째 개조 가 있 습 니 다.
    
    class RateLimiter {
     //       
     private:
     mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a.
     mutable std::atomic<clock::rep> timestamp_{0};     // 1.b.
     //       
    };
    
    bool RateLimiter::operator()() const {
     auto orig_count = this->count_.fetch_add(1UL); // 2.
     if (orig_count < this->limit_) {
     return true;
     } else {
     auto ts = this->timestamp_.load(); // 3.
     auto now = clock::now().time_since_epoch().count();
     if (now - ts < this->interval_count_) {
     return false;
     }
     this->timestamp_.store(now); // 4.
     this->count_.store(1UL); // 5.
     return true;
     }
    }
    여기
  • (1)은 count_timestamp_ 을 원자 로 성명 하여 후속 원자 조작 을 편리 하 게 한다.
  • (2)-(5)는 기 존 조작 을 각각 대응 하 는 원자 조작 으로 바 꾸 었 다.
  • 이렇게 하면 완벽 해 보이 지만 사실은 bug 가 있 습 니 다.우 리 는(4)여기 에 중점 을 두 었 다.4)본 의 는 다음 timestamp_ 시 에 판단 할 수 있 도록 orig_count >= this->limit_ 을 업데이트 하 는 것 이다.이 timestamp 을 정확하게 설정 하 는 것 은 주파수 제한 기의 정확 한 작업 의 초석 이다.그러나 두 개의(또는 더 많은)라인 이 있 으 면(4)까지 걸 어가 면 무슨 일이 일어 날 까요?
  • 원자 작업 의 존재 로 인해 두 스 레 드 가 연이어 실 행 될 것 이다(4).그래서 timestamp_ 의 값 이 무엇 인지 우 리 는 전혀 기대 할 수 없다.
  • 그 밖 에 두 개의 스 레 드 는 선후 로 집행 된다(5).즉,원자 지 는 count_1 으로 설치한다.그러나 주파수 제한 기 는 연이어 두 번 이나 조작 을 했 는데 왜 count_1 이 라 고 생각 합 니까?이것 은 단지 한 번 의 조작 이 아 닙 니까?
  • 이 를 위해 우 리 는 하나의 스 레 드 만 timestamp_ 을 진정 으로 설정 하고 똑 같은 위치 에 있 는 다른 스 레 드 의 조작 을 거절 하여 timestamp_ 을 중복 설정 하고 count_1 으로 잘못 설정 하지 않도록 해 야 한다.
    2 단계 개선
    그래서 다음 과 같은 개선 이 있다.
    
    bool RateLimiter::operator()() const {
     auto orig_count = this->count_.fetch_add(1UL); // 3.
     if (orig_count < this->limit_) { // 4.
     return true;
     } else {
     auto ts = this->timestamp_.load();
     auto now = clock::now().time_since_epoch().count();
     if (now - ts < this->interval_count_) { // 5.
     return false;
     }
     if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1.
     return false;
     }
     this->count_.store(1UL); // 2.
     return true;
     }
    }
    여기(1)는 CAS 원자 조작 이다.그것 은 timestamp_ts 의 값(Compare)을 원자 적 으로 비교 할 것 이다.만약 그들 이 같다 면 now 의 값 을 timestamp_(Swap)에 기록 하고 true 으로 되 돌려 준다.이들 이 같 지 않 으 면 timestamp_ 의 값 을 ts 에 기록 하고 false 으로 돌아간다.만약 다른 스 레 드 가 timestamp_ 의 값 을 미리 수정 하지 않 았 다 면 CAS 작업 은 성공 하고 true 으로 돌아 가 뒤의 코드 를 계속 실행 해 야 합 니 다.그렇지 않 으 면 다른 스 레 드 가 timestamp_ 을 먼저 수 정 했 고 현재 스 레 드 의 조작 은 이전 주기 에 기록 되 어 주파수 제한 기 에 의 해 거부 되 었 음 을 나타 낸다.
    지금 생각해 야 한다.만약 집행 이 끝 난 후 즉시 아무런 지연 도 없 이 집행 한다 면 당연히 모든 것 이 대길 하 다.그러나 이때 현재 스 레 드 가 잘 리 면 무슨 일이 일어 날 까?우 리 는 상황 을 나 누 어 토론 해 야 한다.
    만약 에 ts == 0,즉'현재 스 레 드'는 주파수 제한 기 가 처음으로 timestamp_ 을 수정 한 것 이다.따라서 현재 스 레 드 는(3)에서 count_(넘 치 는 곳)을 0 으로 증가 시 켜 다른 스 레 드 가 통과 할 수 있 습 니 다(4).현재 스 레 드 는 현재 스 레 드 에서 작업 을 거부 해 야 할 수도 있 습 니 다.이 를 위해 서 는(1)과(2)사이 에 추가 적 인 판단 이 필요 하 다.
    
    if (ts == 0) {
     auto orig_count = this->count.fetch_add(1UL);
     return (orig_count < this->limit_);
    }
    만약 에 ts != 0,즉'현재 스 레 드'는 주파수 제한 기 가 처음으로 timestamp_ 을 수정 한 것 이 아니다.그러면 다른 스 레 드 는(4)에서 반드시 실 패 를 판단 하지만(5)에서 의 판단 이 성공 할 수 있 고 계속 성공 적 으로 실 행 될 수 있다(1).그래서 두 번 연속 으로 실 행 될 수 있다(2).그러나 이 는 정확성 에 영향 을 주지 않 는 다.(5)를 통 해 현재 스 레 드 에 대한 timestamp_ 을 나타 내 고 한 시간 대(interval)가 지 났 기 때문에 이 시간 대 에 현재 스 레 드 의 한 번 만 받 아들 일 수 있 습 니 다.
    제3 차 개선
    이로부터 우 리 는 얻 을 수 있다.
    
    bool RateLimiter::operator()() const {
     auto orig_count = this->count_.fetch_add(1UL);
     if (orig_count < this->limit_) {
     return true;
     } else {
     auto ts = this->timestamp_.load();
     auto now = clock::now().time_since_epoch().count();
     if (now - ts < this->interval_count_) {
     return false;
     }
     if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
     return false;
     }
     if (ts == 0) {
     auto orig_count = this->count.fetch_add(1UL);
     return (orig_count < this->limit_);
     }
     this->count_.store(1UL);
     return true;
     }
    }
    이로써 우리 의 코드 는 논리 적 으로 이미 성립 되 었 고 그 다음 에 성능 방면 의 최 적 화 를 해 야 한다.
    원자 조작 은 기본적으로 std::memory_order_seq_cst 의 메모리 모델 을 사용한다.이것 은 C++에서 가장 엄격 한 메모리 모델 입 니 다.두 가지 보증 이 있 습 니 다.
  • 프로그램 명령 과 소스 코드 순서 가 일치 합 니 다.
  • 모든 스 레 드 의 모든 조작 은 일치 하 는 순서 가 있다.
  • 순서 일치 성(sequential consistency)을 실현 하기 위해 컴 파일 러 는 컴 파일 러 최적화 와 CPU 난 서 실행(Out-of-Order Execution)에 대항 하 는 수단 을 많이 사용 하기 때문에 성능 이 떨어진다.이곳 의 count_ 에 대해 우 리 는 순서 일치 모델 이 필요 없고 방출 모델(Aquire-Release)모델 만 얻 으 면 충분 하 다.
    네 번 째 개선
    그래서 네 번 째 개선 이 있 습 니 다.
    
    bool RateLimiter::operator()() const {
     auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel);
     if (orig_count < this->limit_) {
     return true;
     } else {
     auto ts = this->timestamp_.load();
     auto now = clock::now().time_since_epoch().count();
     if (now - ts < this->interval_count_) {
     return false;
     }
     if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
     return false;
     }
     if (ts == 0) {
     auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel);
     return (orig_count < this->limit_);
     }
     this->count_.store(1UL, std::memory_order_release);
     return true;
     }
    }
    이로써 우 리 는 주파수 제한 기 를 완전 하 게 실현 하여 QPS 압력 측정 을 즐겁게 시작 할 수 있 습 니 다!
    총결산
    C++에서 스 레 드 안전 을 실현 하 는 주파수 제한 기 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.C++에서 스 레 드 안전 을 실현 하 는 주파수 제한 기 내용 은 예전 의 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 저 희 를 많이 지지 해 주 십시오!

    좋은 웹페이지 즐겨찾기