AWS에서 서버리스 이벤트 기반 배치

AWS에서 배치를 구축할 때 운영 부하를 고려하면 서버리스를 검토하고 싶다고 생각합니다.
게다가 자원의 낭비를 없애기 위해서 이벤트 드리븐인 아키텍쳐에도 하고 싶네요.
AWS에서 서버리스로 이벤트 드리븐인 배치의 아키텍처로서 유명한 것이 S3를 이벤트 트리거로 한 Lambda 실행이라고 생각합니다.
(전제적으로, 파일의 업로드를 처리 실행의 트리거로 합니다.)


다만, Lambda에는 여러가지 제한 가 있어, Lambda에서는 실현할 수 없는 케이스도 많이 있을까 생각합니다.

이번에는 그러한 경우에 Fargate를 이용해 서버리스로 이벤트 드리븐인 배치를 구축해 나가고 싶습니다.
구축은 Terraform에서 확실히 구축하고 싶습니다.
코드는 여기

아키텍처



아키텍처는 다음과 같습니다.

Fargate를 태스크 실행을 위한 트리거로는 Cloud Watch Events도 있습니다만, 그 경우에는 태스크 정의의 컨테이너의 덮어쓰기가 지정할 수 없고, S3 트리거로부터의 이벤트 정보등을 취급할 수 없습니다 .
그래서 이번에는 Fargate의 태스크 실행은 Lambda에서 task_run을 실행하기로 결정합니다.

S3-람다



S3을 트리거로 람다의 실행은 다음과 같이 만들 수 있습니다.

우선 Lambda 함수를 만드는 것입니다. 실행 정책도 함께 만들어 버립니다.
resource "aws_iam_role" "default" {
  name               = var.service_name
  description        = "IAM Rolw for ${var.run_task}"
  assume_role_policy = file("${var.run_task}-role.json")
}

resource "aws_iam_policy" "default" {
  name        = var.service_name
  description = "IAM Policy for ${var.run_task}"
  policy      = file("${var.run_task}-policy.json")
}

resource "aws_iam_role_policy_attachment" "default" {
  role       = aws_iam_role.default.name
  policy_arn = aws_iam_policy.default.arn
}

resource "aws_cloudwatch_log_group" "default" {
  name              = "/aws/lambda/${var.run_task}"
  retention_in_days = 7
}

data archive_file "default" {
  type        = "zip"
  source_dir  = "../${var.run_task}"
  output_path = "${var.run_task}.zip"
}

resource "aws_lambda_function" "default" {
  filename         = "${var.run_task}.zip"
  function_name    = var.run_task
  role             = aws_iam_role.default.arn
  handler          = "main.main"
  source_code_hash = data.archive_file.default.output_base64sha256
  runtime          = "python3.7"
  environment {
    variables = {
      CLUSTER_NAME      = var.service_name
      SUBNET_ID         = var.subnet_id
      SECURITY_GROUP_ID = var.security_group_id
      TASK_DEFINITION   = aws_ecs_task_definition.default.revision
    }
  }
}

S3와 Lambda의 협력은 다음과 같습니다.
resource "aws_s3_bucket" "default" {
  bucket = var.run_task
}

resource "aws_s3_bucket_notification" "default" {
  bucket = aws_s3_bucket.default.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.default.arn
    events              = ["s3:ObjectCreated:*"]
  }
}

resource "aws_lambda_permission" "default" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.default.arn
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.default.arn
}

Fargate



여기에서 Fargate를 만드는 것입니다.
resource "aws_cloudwatch_log_group" "fargate" {
  name = "/ecs/${var.service_name}"
}

resource "aws_ecs_cluster" "fargate" {
  name = var.service_name
}

resource "aws_iam_role" "fargate" {
  name               = "fargate-${var.service_name}"
  description        = "IAM Rolw for ${var.service_name}"
  assume_role_policy = file("${var.service_name}-role.json")
}

resource "aws_iam_policy" "fargate" {
  name        = "fargate-${var.service_name}"
  description = "IAM Policy for ${var.service_name}"
  policy      = file("${var.service_name}-policy.json")
}

resource "aws_iam_role_policy_attachment" "fargate" {
  role       = aws_iam_role.fargate.name
  policy_arn = aws_iam_policy.fargate.arn
}

resource "aws_ecs_task_definition" "default" {
  family                = var.service_name
  container_definitions = <<EOF
[
  {
    "name": "${var.service_name}",
    "image": "${var.ecr_image_arn}",
    "essential": true,
    "memory": 256
  }
]
EOF
  network_mode = "awsvpc"
  execution_role_arn = aws_iam_role.fargate.arn
  cpu = 1024
  memory = 2048
  requires_compatibilities = ["FARGATE"]
}

중요한 것은 태스크 정의의 작성으로, Fargate를 이용하는 경우는 network_mode를 awsvpc에 지정할 필요가requires_compatibilities 에서 Fargate 호환성을 정의해야 합니다.
또, 그 때에는 cpumemory 의 지정도 필수가 되므로 지정하도록(듯이) 부탁합니다.

이러한 방식으로 인프라 구성이 완성되었습니다.

task_run



마지막으로 Fargate 작업 실행을위한 스크립트입니다.
Fargate의 컨테이너는 docker run イメージ名 python main.py hoge 와 같이 해, 최초의 인수를 Fargate의 컨테이너내에서 취급할 수 있도록 하고 있습니다.
import os
import boto3


def main(event, _):
    """
    hoge
    """
    filename = event['Records'][0]['s3']['bucket']['name']
    client = boto3.client('ecs')
    response = client.run_task(
        cluster=os.environ['CLUSTER_NAME'],
        count=1,
        launchType='FARGATE',
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': [
                    os.environ['SUBNET_ID']
                ],
                'securityGroups': [
                    os.environ['SECURITY_GROUP_ID']
                ],
                'assignPublicIp': 'ENABLED'
            }
        },
        overrides={
            'containerOverrides': [
                {
                    'name': 'hogehoge',
                    'command': [
                        'python',
                        'main.py',
                        'ほげ{}'.format(filename)
                    ]
                },
            ]
        },
        taskDefinition=os.environ['TASK_DEFINITION']
    )
    print(response)

이렇게 하면 S3의 이벤트 정보를 Fargate에 가져올 수 있어 서버리스 이벤트 드리븐 아키텍처가 완성됩니다.

참고


  • Terraform Documentation
  • AWS Documentation
  • 좋은 웹페이지 즐겨찾기