Azure 함수에 세션이 있는 질서정연한 대기열 처리

주문 얘기 좀 합시다.이것은 내가 가장 좋아하는 화제 중의 하나이다I've blogged about extensively before.이전에 Azure 함수에서의 질서정연한 처리는 Azure 이벤트 센터와 같은 이벤트 흐름에만 적용되었지만, 오늘은 서비스 버스 대기열과 테마의 순서를 어떻게 유지하는지 보여주고 싶습니다.
표면적으로 보면 이것은 매우 간단한 것 같다. 나는 소식을 받은 정확한 순서에 따라 대열의 소식을 처리할 수 있기를 바란다.기계에서 운행하는 간단한 서비스에 대해 말하자면, 이것은 매우 실현하기 쉽다.그러나 내가 대규모로 처리하고 싶을 때, 어떻게 대기열 메시지의 순서를 유지합니까?Azure와 유사한 기능이 있습니다. 저는 수십 개의 이벤트 실례를 뛰어넘어 메시지를 처리할 수 있습니다. 저는 어떻게 순서를 유지합니까?
병원의 환자를 처리하는 간단한 메시지 전달 시스템의 예시를 사용합시다.상상해 보세요. 저는 모든 환자를 위해 몇 가지 활동을 안배했습니다.
  • 환자 도착
  • 환자가 방 하나를 분배했다
  • 환자 치료
  • 환자 퇴원
  • 나는 내가 환자의 치료를 끝내기 전에 무질서한 소식을 처리하지 않을 뿐만 아니라 잠재적으로 환자를 퇴원시키지 않을 것을 확보하고 싶다.
    무슨 일이 일어날지 빠른 실험을 해 봅시다.이를 위해 나는 1000명의 환자를 모의하여 환자마다 이 네 가지 메시지(순서대로)를 보내고 처리(이상적인 상황에서도 순서대로)를 할 것이다.

    기본 및 무질서


    대기열에서만 트리거되는 간단한 Azure 함수를 사용해 보겠습니다.나는 어떤 특별한 일도 하지 않을 것이다. 대기열에서 터치하고 처리한 동작을 Redis 캐시의 목록으로 미루기만 하면 된다.
    public async Task Run(
        [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message, 
        ILogger log)
    {
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
        await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
    }
    
    1000명의 환자 데이터 (메시지당 4개) 를 이 대기열로 보낸 후 Redis 캐시는 어떻게 처리됩니까?응, 어떤 환자들은 보기에 매우 좋아 보인다.내가 환자 #4를 찾았을 때, 나는 다음과 같이 보았다.
    >lrange Patient-$4 0 -1
    1) "Message-0"
    2) "Message-1"
    3) "Message-2"
    4) "Message-3"
    
    너무 좋아요.모든 4개의 사건은 환자 4에게 발송되고 순서대로 처리된다.하지만 환자 2:
    >lrange Patient-$2 0 -1
    1) "Message-1"
    2) "Message-2"
    3) "Message-0"
    4) "Message-3"
    
    이런 상황에서 다른 두 가지 소식을 처리한 후에야 비로소 환자 도착 소식에 대한 처리를 끝냈다.그럼 여기서 무슨 일이 일어났어요?Azure Service Bus가 주문을 보증했는데 왜 내 메시지에 문제가 생겼습니까?
    기본적으로 대기열 트리거는 일을 합니다.우선, 모든 시작 사례에 대해 하나의 메시지를 동시에 처리합니다.By default 32개의 메시지를 동시에 처리하는 실례.이는 환자의 모든 4개의 메시지를 동시에 처리할 수 있고 완료 순서와 발송 순서가 다르다는 것을 의미한다.이것은 복구하기 쉬울 것 같으니, 병발 제한을 1로 합시다.

    역모드: 확장 및 병렬 제한


    이것은 아마도 내가 본 상술한 문제에서 가장 흔히 볼 수 있는 해결 방안일 것이다.병발성은 32개가 아니라 한 번에 1개만 처리하는 것으로 제한합시다.이를 위해 host.json 파일을 수정하고 maxConcurrentCalls 을 1로 설정했습니다.현재 모든 실례는 한 번에 하나의 메시지만 처리한다.나는 다시 같은 테스트를 실행한다.
    우선, 그것은 매우 느리다.나는 실례마다 한 번에 한 개만 처리하기 때문에 4000개의 대열 메시지를 자세히 읽는 데 오랜 시간이 걸렸다.더 나쁜 것은?내가 사후 검사 결과를 냈을 때, 몇몇 환자들은 여전히 정상이 아니었다.이게 어떻게 된 일입니까?비록 나는 실례의 병발성을 1로 제한했지만, Azure 함수는 이미 나를 여러 실례로 확장시켰다.따라서 만약 내가 20개의 확장된 기능 응용 프로그램의 실례를 가지고 있다면, 나는 20개의 메시지 (실례마다 1개) 를 동시에 처리할 것이다.이것은 내가 여전히 같은 시간에 같은 환자로부터 온 정보를 처리할 수 있다는 것을 의미한다. 단지 다른 상황에서.나는 여전히 질서정연한 처리를 보장할 수 없다.
    여기 복구?많은 사람들이 Azure 기능의 확장을 제한하기를 원한다.비록 it's technically possible 하지만 그것은 나의 흡수량에 더 큰 상처를 줄 것이다.현재 한 번에 전 세계의 한 가지 소식만 처리할 수 있다는 것은 고유량 기간에 나는 대량으로 쌓인 환자 사건을 받게 되고 나의 기능이 따라가지 못할 수도 있다는 것을 의미한다.

    구조 회의


    만약 내가 여기서 이 박문을 끝낸다면, 설마 매우 슬프지 않겠는가?더 좋은 방법이 있다!이전에 내가 말했듯이, 당신이 이곳에서 가장 좋은 선택은 이벤트 센터를 사용하는 것입니다. 왜냐하면 구역과 일괄 처리 you can guarantee ordering 때문입니다.그러나 이곳의 도전은 대기열의 사무성 (예를 들어 재시도와 사신) 을 고려하면, 때로는 이 작업에 적합한 메시지 에이전트이기도 하다.이제 대기열을 사용하고 서비스 버스 세션을 통해 주문을 받을 수 있습니다🎉.
    그럼 세션이 뭐예요?세션을 사용하면 메시지 그룹에 대한 식별자를 설정할 수 있습니다.세션에서 온 메시지를 처리하려면 먼저 세션을 잠가야 합니다.그런 다음 세션의 각 메시지를 개별적으로 처리하기 시작할 수 있습니다(일반 대기열과 같은 잠금/전체 의미 사용).세션의 장점은 여러 개의 실례를 뛰어넘어 대규모로 처리할 때도 순서를 유지할 수 있다는 것이다.이전에 생각해 보면, 우리는 20개의 Azure Function 응용 프로그램의 실례를 가지고 있는데, 그것들은 모두 같은 대기열을 경쟁하고 있었다.이제 모든 20개의 인스턴스는 사용 가능한 세션을 잠그고 세션의 이벤트만 처리하며 20개로 확장하지 않습니다.세션은 세션에서 오는 메시지를 순서대로 처리해야 합니다.
    언제든지 동적으로 세션을 생성할 수 있습니다.Azure 함수의 인스턴스가 나타나면 먼저 "세션 ID가 잠기지 않은 메시지가 있습니까?"라고 묻습니다.이렇게 하면 세션이 잠기고 순서대로 처리됩니다.세션에 더 이상 사용할 수 있는 메시지가 없을 때, Azure 기능은 잠금을 해제하고 다음 사용 가능한 세션으로 이동합니다.메시지가 속한 세션을 먼저 잠그지 않으면 아무런 메시지도 처리되지 않습니다.
    위의 예에 대해 같은 4000건의 메시지를 보냅니다.이 경우 환자 ID를 세션 ID로 설정합니다. 모든 Azure 기능 실례는 세션 (환자) 의 자물쇠를 가져와 사용할 수 있는 모든 메시지를 처리한 다음 사용할 수 있는 다른 환자로 옮깁니다.

    Azure 함수에서 세션 사용하기


    세션은 현재 Microsoft.Azure.WebJobs.Extensions.ServiceBus 확장에서 사용할 수 있으며, 버전 >=3.1.0을 사용하며, 본문을 작성할 때 미리 보기 중입니다.우선, 나는 분기기를 도입할 것이다.
    Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre
    
    그리고 세션을 활성화하기 위해 기능 코드를 최소한으로 변경합니다(isSessionsEnabled = true.
    public async Task Run(
        [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
        ILogger log)
    {
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
        await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
    }
    
    세션이 활성화된 대기열이나 테마를 사용해야 합니다.

    내가 메시지를 대기열로 보낼 때, 나는 내가 보낸 모든 환자의 메시지에 정확한 값을 설정할 것이다sessionId.
    함수를 발표한 후, 나는 4000개의 메시지를 보냈다.대기열이 곧 소모될 것이다. 왜냐하면 나는 확장 실례를 뛰어넘어 여러 개의 세션을 동시에 처리할 수 있기 때문이다.테스트를 실행한 후 Redis 캐시를 확인합니다.예상한 바와 같이 나는 모든 소식이 처리된 것을 보았다. 모든 환자에게 나는 그것들이 순서대로 처리된 것을 보았다.
    >lrange Patient-$10 0 -1
    1) "Message-0"
    2) "Message-1"
    3) "Message-2"
    4) "Message-3"
    
    >lrange Patient-$872 0 -1
    1) "Message-0"
    2) "Message-1"
    3) "Message-2"
    4) "Message-3"
    
    따라서 세션에 대한 새로운 Azure 기능이 지원되어 서비스 버스 대기열이나 테마에서 온 메시지를 순서대로 처리할 수 있으며 전체 처리량을 희생할 필요가 없습니다.나는 새로운 세션이나 기존 세션에 동적 으로 메시지를 추가할 수 있으며, 세션의 메시지는 서비스 버스가 수신한 순서에 따라 처리될 것이라고 확신할 수 있다.
    my GitHub repo에서 메시지를 테스트하고 불러오는 데 사용되는 전체 예시를 볼 수 있습니다.master지점은 모두 정상입니다. out-of-order지점은 기본적인 무질서한 실험입니다.

    좋은 웹페이지 즐겨찾기