Simple Batch Pipeline

Create Fake Data

  • Faker 모듈을 통해 가짜 데이터를 만들 수 있습니다.
import psycopg2 as db
from faker import Faker
fake=Faker()
data=[]
for i in range(1, 11):
    data.append((i, fake.name(), fake.street_address(), fake.city(), fake.zipcode(), fake.longitude(), fake.latitude()))

data_for_db=tuple(data)
print(data_for_db)
conn_string="dbname='postgres' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users (id, name, street, city, zip, lng, lat) values(%s, %s, %s, %s, %s, %s, %s)"
print(cur.mogrify(query,data_for_db[1]))
cur.executemany(query,data_for_db)
conn.commit()
query2 = "select * from users"

cur.execute(query2)
print(cur.fetchall())

Create CSV file

  • PostgreSQL을 통해서 CSVfile을 생성합니다.
COPY (
       select name,
              id,
              street,
              city,
              zip,
              lng,
              lat
       from faker_user_data
) TO '{file_path/file_name}' WITH (FORMAT CSV, HEADER);
  • CSV : csv 파일형식으로 파일을 생성한다.
  • HEADER : csv 파일 문서 상단에 헤더를 포함하도록 한다.

Amazon S3

Create

  • us-east-1 외의 지역에 버킷을 생성할 때 사용합니다.
aws s3api create-bucket \
    --bucket my-bucket \
    --region eu-west-1 \
    --create-bucket-configuration LocationConstraint=eu-west-1

list

  • 모든 Amazon S3 버킷의 이름을 표시합니다.
aws s3api list-buckets --query "Buckets[].Name"

delete

  • 버킷을 삭제합니다.
aws s3api delete-bucket --bucket my-bucket

Upload csv file

  • csv 파일을 S3에 업로드 합니다.
op_kwargs={
        "file_name": "/temp/faker_user_data.csv",
        "key": "stage/{{ ds }}/faker_user_data.csv",
        "bucket_name": BUCKET_NAME,
        "remove_local": "true",
    },
import os
from airflow.hooks.S3_hook import S3Hook

def _local_to_s3(
    bucket_name: str, key: str, file_name: str, remove_local: bool = False
) -> None:
    s3 = S3Hook()
    s3.load_file(filename=file_name, bucket_name=bucket_name, replace=True, key=key)
    if remove_local:
        if os.path.isfile(file_name):
            os.remove(file_name)

Amazon IAM

  • 다른 AWS 리소스의 데이터에 액세스하는 작업의 경우 클러스터에 사용자를 대신해 리소스와 리소스의 데이터에 액세스할 권한이 필요합니다. AWS Identity and Access Management(IAM)을 사용하여 그러한 권한을 제공합니다.

Create IAM role

  • IAM role을 생성합니다. 먼저 sts:AssumeRole 이라는 임시 자격 증명을 받을 수 있는 정책을 생성합니다.
echo '{
     "Version": "2012-10-17",
     "Statement": [
         {
         "Effect": "Allow",
         "Principal": {
             "Service": "redshift.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
         }
     ]
}' > ./Test-Role-Trust-Policy.json
aws iam create-role --role-name Test-Role --assume-role-policy-document file://Test-Role-Trust-Policy.json

attach role

  • COPY를 사용하여 Amazon S3에 액세스하려면 AmazonS3ReadOnlyAccess를 추가해야 합니다.
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name Test-Role

list IAM role

  • IAM role을 보여줍니다.
aws iam list-roles

delete IAM role

  • 먼저 지정된 역할을 제거해야 합니다. 그 후에 IAM role을 삭제할 수 있습니다.
aws iam detach-role-policy --role-name Test-Role  --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
aws iam delete-role --role-name Test-Role

Amazon Redshift

  • Amazon Redshift는 AWS 클라우드에서 완벽하게 관리되는 페타바이트급 데이터 웨어하우스 서비스입니다. Amazon Redshift 데이터 웨어하우스는 노드라는 컴퓨팅 리소스의 모음으로, 노드는 클러스터라는 그룹을 구성합니다. 각 클러스터는 Amazon Redshift 엔진을 실행하며, 하나 이상의 데이터베이스를 포함합니다.

Create Redshift

  • redshift cluster를 생성합니다.
aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster

list Redshift

  • 생성된 redshift cluster를 보여줍니다.
aws redshift describe-clusters

delete Redshift

  • redshift cluster를 삭제합니다.
aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot

upload redshift

  • 테이블을 생성한 뒤 copy 명령어를 통해 S3의 데이터를 redshift에 넣는다.
create table {tablename} (
	col1 datatype,
    col2 datatype
);
op_kwargs={
        "qry": "copy public.tablename "
               + " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
               + " iam_role '" + IAM_ROLE + "'"
               + " csv "
               + " ignoreheader 1 ",
    },
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
def _s3_to_redshift(qry: str) -> None:
    rs_hook = PostgresHook(postgres_conn_id="redshift")
    rs_conn = rs_hook.get_conn()
    rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    rs_cursor = rs_conn.cursor()
    rs_cursor.execute(qry)
    rs_cursor.close()
    rs_conn.commit()

Reference

S3 CLI List
boto3 Upload file
Airflow docker compose
RedShift CLI List
AWS IAM 생성하기

좋은 웹페이지 즐겨찾기