AWS Step Functions: 페이지가 매겨진 API 응답 처리

AWS Step Functionsadded support for AWS SDK integration 이후 AWS 서비스의 서버리스 통합에 매우 강력해졌습니다. 이전에는 이동 접근 방식이 간단한 Lambda 함수였지만 Step Functions에는 유지 관리가 훨씬 적습니다. 그러나 Step Functions에는 작업하거나 해결해야 하는 (대부분 의도적인) 제한이 있습니다.

일부 AWS API 작업은 페이지가 매겨진 응답을 반환합니다. 즉, 전체 응답의 청크를 얻고 원하는 경우 다음 청크를 요청해야 합니다. 따라서 150개의 항목이 있는 경우 한 번에 25개를 가져와야 할 수 있습니다. 각 상태가 한 번만 호출하는 Step Functions에서 이를 어떻게 처리합니까?

페이지 매김 처리의 핵심은 일반적으로 NextToken 라는 연속 토큰 항목의 존재를 기반으로 반복하는 것입니다. 페이지가 매겨진 응답이 반환되면 JSON에 이 토큰이 포함되며, 이는 (1) 검색할 추가 항목이 있음을 나타내고 (2) 다음 페이지를 원함을 나타내기 위해 후속 API 호출에서 제공되어야 합니다. 안타깝게도 토큰 이름이 일치하지 않습니다. Ian McKay의 전체 목록rules for AWS SDK pagination을 참조하세요.

다음은 구체적인 예입니다. 각각 가상 데스크톱을 나타내는 Amazon WorkSpaces 서비스의 모든 인스턴스에서 최신 이미지를 사용하도록 업데이트하는 것과 같은 작업을 수행하려고 한다고 가정해 보겠습니다. workspaces:DescribeWorkspaces는 한 번에 25개를 반환하지만 수백 개가 있습니다.


DescribeWorkspaces를 호출하여 첫 ​​번째 항목 집합을 가져오고 각 항목을 원하는 작업(이 경우에는 RebuildWorkspaces)에 매핑합니다. 그런 다음 Choice 상태는 작업 결과에 NextToken가 있는지 확인합니다. 그렇다면 이번에는 DescribeWorkspaces 매개변수를 사용하여 NextToken를 다시 호출하십시오. 결정적으로 이것은 첫 번째 호출의 작업 결과를 덮어쓰므로 다음에 Choice 상태에 도달하면 검색할 항목이 더 있는지 확인할 수 있습니다(NextToken가 여전히 존재함). 끝에 도달한 경우(NextToken가 없음) 워크플로를 계속 진행할 수 있습니다.

다음은 Python을 사용하는 AWS CDK의 이 Step Functions 상태 시스템에 대한 전체 정의입니다.

import aws_cdk.aws_stepfunctions as sfn
import aws_cdk.aws_stepfunctions_tasks as sfn_tasks

describe_workspaces = sfn_tasks.CallAwsService(
            self,
            id="DescribeWorkspaces",
            comment="Get workspaces",
            service="workspaces",
            action="describeWorkspaces",
            result_path="$.DescribeWorkspacesResult",
            iam_resources=["*"],
        )

        describe_more_workspaces = sfn_tasks.CallAwsService(
            self,
            id="DescribeMoreWorkspaces",
            comment="Get workspaces with NextToken",
            service="workspaces",
            action="describeWorkspaces",
            parameters={
                "NextToken": sfn.JsonPath.string_at(
                    "$.DescribeWorkspacesResult.NextToken"
                )
            },
            result_path="$.DescribeWorkspacesResult",
            iam_resources=["*"],
        )

        rebuild_workspaces = sfn_tasks.CallAwsService(
            self,
            id="RebuildWorkspaces",
            comment="Rebuild workspaces",
            service="workspaces",
            action="rebuildWorkspaces",
            parameters={
                "RebuildWorkspaceRequests": [
                    {"WorkspaceId": sfn.JsonPath.string_at("$.WorkspaceId")}
                ]
            },
            result_path="$.RebuildWorkspacesResult",
            iam_resources=["*"],
        )

        rebuild_each_workspace = sfn.Map(
            self,
            id="RebuildEachWorkspace",
            comment="Rebuild each workspace",
            items_path="$.DescribeWorkspacesResult.Workspaces",
            output_path=sfn.JsonPath.DISCARD,
        )
        rebuild_each_workspace.iterator(rebuild_workspaces)

        definition = describe_workspaces.next(rebuild_each_workspace).next(
            sfn.Choice(self, "ChoiceMoreWorkspaces")
            .when(
                sfn.Condition.is_present("$.DescribeWorkspacesResult.NextToken"),
                describe_more_workspaces.next(rebuild_each_workspace),
            )
            .otherwise(sfn.Succeed(self, "Done"))
        )

        state_machine = sfn.StateMachine(
            self,
            id="WorkSpacesRebuilderStateMachine",
            state_machine_type=sfn.StateMachineType.STANDARD,
            definition=definition,
        )

좋은 웹페이지 즐겨찾기