Simple Batch Pipeline
Create Fake Data
- Faker 모듈을 통해 가짜 데이터를 만들 수 있습니다.
import psycopg2 as db
from faker import Faker
fake=Faker()
data=[]
for i in range(1, 11):
data.append((i, fake.name(), fake.street_address(), fake.city(), fake.zipcode(), fake.longitude(), fake.latitude()))
data_for_db=tuple(data)
print(data_for_db)
conn_string="dbname='postgres' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users (id, name, street, city, zip, lng, lat) values(%s, %s, %s, %s, %s, %s, %s)"
print(cur.mogrify(query,data_for_db[1]))
cur.executemany(query,data_for_db)
conn.commit()
query2 = "select * from users"
cur.execute(query2)
print(cur.fetchall())
Create CSV file
- PostgreSQL을 통해서 CSVfile을 생성합니다.
COPY (
select name,
id,
street,
city,
zip,
lng,
lat
from faker_user_data
) TO '{file_path/file_name}' WITH (FORMAT CSV, HEADER);
- CSV : csv 파일형식으로 파일을 생성한다.
- HEADER : csv 파일 문서 상단에 헤더를 포함하도록 한다.
Amazon S3
Create
- us-east-1 외의 지역에 버킷을 생성할 때 사용합니다.
aws s3api create-bucket \
--bucket my-bucket \
--region eu-west-1 \
--create-bucket-configuration LocationConstraint=eu-west-1
list
- 모든 Amazon S3 버킷의 이름을 표시합니다.
aws s3api list-buckets --query "Buckets[].Name"
delete
- 버킷을 삭제합니다.
aws s3api delete-bucket --bucket my-bucket
Upload csv file
- csv 파일을 S3에 업로드 합니다.
op_kwargs={
"file_name": "/temp/faker_user_data.csv",
"key": "stage/{{ ds }}/faker_user_data.csv",
"bucket_name": BUCKET_NAME,
"remove_local": "true",
},
import os
from airflow.hooks.S3_hook import S3Hook
def _local_to_s3(
bucket_name: str, key: str, file_name: str, remove_local: bool = False
) -> None:
s3 = S3Hook()
s3.load_file(filename=file_name, bucket_name=bucket_name, replace=True, key=key)
if remove_local:
if os.path.isfile(file_name):
os.remove(file_name)
Amazon IAM
- 다른 AWS 리소스의 데이터에 액세스하는 작업의 경우 클러스터에 사용자를 대신해 리소스와 리소스의 데이터에 액세스할 권한이 필요합니다. AWS Identity and Access Management(IAM)을 사용하여 그러한 권한을 제공합니다.
Create IAM role
- IAM role을 생성합니다. 먼저 sts:AssumeRole 이라는 임시 자격 증명을 받을 수 있는 정책을 생성합니다.
echo '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}' > ./Test-Role-Trust-Policy.json
aws iam create-role --role-name Test-Role --assume-role-policy-document file://Test-Role-Trust-Policy.json
attach role
- COPY를 사용하여 Amazon S3에 액세스하려면 AmazonS3ReadOnlyAccess를 추가해야 합니다.
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name Test-Role
list IAM role
- IAM role을 보여줍니다.
aws iam list-roles
delete IAM role
- 먼저 지정된 역할을 제거해야 합니다. 그 후에 IAM role을 삭제할 수 있습니다.
aws iam detach-role-policy --role-name Test-Role --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
aws iam delete-role --role-name Test-Role
Amazon Redshift
- Amazon Redshift는 AWS 클라우드에서 완벽하게 관리되는 페타바이트급 데이터 웨어하우스 서비스입니다. Amazon Redshift 데이터 웨어하우스는 노드라는 컴퓨팅 리소스의 모음으로, 노드는 클러스터라는 그룹을 구성합니다. 각 클러스터는 Amazon Redshift 엔진을 실행하며, 하나 이상의 데이터베이스를 포함합니다.
Create Redshift
- redshift cluster를 생성합니다.
aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster
list Redshift
- 생성된 redshift cluster를 보여줍니다.
aws redshift describe-clusters
delete Redshift
- redshift cluster를 삭제합니다.
aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot
upload redshift
- 테이블을 생성한 뒤 copy 명령어를 통해 S3의 데이터를 redshift에 넣는다.
create table {tablename} (
col1 datatype,
col2 datatype
);
op_kwargs={
"qry": "copy public.tablename "
+ " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
+ " iam_role '" + IAM_ROLE + "'"
+ " csv "
+ " ignoreheader 1 ",
},
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
def _s3_to_redshift(qry: str) -> None:
rs_hook = PostgresHook(postgres_conn_id="redshift")
rs_conn = rs_hook.get_conn()
rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
rs_cursor = rs_conn.cursor()
rs_cursor.execute(qry)
rs_cursor.close()
rs_conn.commit()
Reference
import psycopg2 as db
from faker import Faker
fake=Faker()
data=[]
for i in range(1, 11):
data.append((i, fake.name(), fake.street_address(), fake.city(), fake.zipcode(), fake.longitude(), fake.latitude()))
data_for_db=tuple(data)
print(data_for_db)
conn_string="dbname='postgres' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users (id, name, street, city, zip, lng, lat) values(%s, %s, %s, %s, %s, %s, %s)"
print(cur.mogrify(query,data_for_db[1]))
cur.executemany(query,data_for_db)
conn.commit()
query2 = "select * from users"
cur.execute(query2)
print(cur.fetchall())
- PostgreSQL을 통해서 CSVfile을 생성합니다.
COPY (
select name,
id,
street,
city,
zip,
lng,
lat
from faker_user_data
) TO '{file_path/file_name}' WITH (FORMAT CSV, HEADER);
- CSV : csv 파일형식으로 파일을 생성한다.
- HEADER : csv 파일 문서 상단에 헤더를 포함하도록 한다.
Amazon S3
Create
- us-east-1 외의 지역에 버킷을 생성할 때 사용합니다.
aws s3api create-bucket \
--bucket my-bucket \
--region eu-west-1 \
--create-bucket-configuration LocationConstraint=eu-west-1
list
- 모든 Amazon S3 버킷의 이름을 표시합니다.
aws s3api list-buckets --query "Buckets[].Name"
delete
- 버킷을 삭제합니다.
aws s3api delete-bucket --bucket my-bucket
Upload csv file
- csv 파일을 S3에 업로드 합니다.
op_kwargs={
"file_name": "/temp/faker_user_data.csv",
"key": "stage/{{ ds }}/faker_user_data.csv",
"bucket_name": BUCKET_NAME,
"remove_local": "true",
},
import os
from airflow.hooks.S3_hook import S3Hook
def _local_to_s3(
bucket_name: str, key: str, file_name: str, remove_local: bool = False
) -> None:
s3 = S3Hook()
s3.load_file(filename=file_name, bucket_name=bucket_name, replace=True, key=key)
if remove_local:
if os.path.isfile(file_name):
os.remove(file_name)
Amazon IAM
- 다른 AWS 리소스의 데이터에 액세스하는 작업의 경우 클러스터에 사용자를 대신해 리소스와 리소스의 데이터에 액세스할 권한이 필요합니다. AWS Identity and Access Management(IAM)을 사용하여 그러한 권한을 제공합니다.
Create IAM role
- IAM role을 생성합니다. 먼저 sts:AssumeRole 이라는 임시 자격 증명을 받을 수 있는 정책을 생성합니다.
echo '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}' > ./Test-Role-Trust-Policy.json
aws iam create-role --role-name Test-Role --assume-role-policy-document file://Test-Role-Trust-Policy.json
attach role
- COPY를 사용하여 Amazon S3에 액세스하려면 AmazonS3ReadOnlyAccess를 추가해야 합니다.
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name Test-Role
list IAM role
- IAM role을 보여줍니다.
aws iam list-roles
delete IAM role
- 먼저 지정된 역할을 제거해야 합니다. 그 후에 IAM role을 삭제할 수 있습니다.
aws iam detach-role-policy --role-name Test-Role --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
aws iam delete-role --role-name Test-Role
Amazon Redshift
- Amazon Redshift는 AWS 클라우드에서 완벽하게 관리되는 페타바이트급 데이터 웨어하우스 서비스입니다. Amazon Redshift 데이터 웨어하우스는 노드라는 컴퓨팅 리소스의 모음으로, 노드는 클러스터라는 그룹을 구성합니다. 각 클러스터는 Amazon Redshift 엔진을 실행하며, 하나 이상의 데이터베이스를 포함합니다.
Create Redshift
- redshift cluster를 생성합니다.
aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster
list Redshift
- 생성된 redshift cluster를 보여줍니다.
aws redshift describe-clusters
delete Redshift
- redshift cluster를 삭제합니다.
aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot
upload redshift
- 테이블을 생성한 뒤 copy 명령어를 통해 S3의 데이터를 redshift에 넣는다.
create table {tablename} (
col1 datatype,
col2 datatype
);
op_kwargs={
"qry": "copy public.tablename "
+ " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
+ " iam_role '" + IAM_ROLE + "'"
+ " csv "
+ " ignoreheader 1 ",
},
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
def _s3_to_redshift(qry: str) -> None:
rs_hook = PostgresHook(postgres_conn_id="redshift")
rs_conn = rs_hook.get_conn()
rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
rs_cursor = rs_conn.cursor()
rs_cursor.execute(qry)
rs_cursor.close()
rs_conn.commit()
Reference
aws s3api create-bucket \
--bucket my-bucket \
--region eu-west-1 \
--create-bucket-configuration LocationConstraint=eu-west-1
aws s3api list-buckets --query "Buckets[].Name"
aws s3api delete-bucket --bucket my-bucket
op_kwargs={
"file_name": "/temp/faker_user_data.csv",
"key": "stage/{{ ds }}/faker_user_data.csv",
"bucket_name": BUCKET_NAME,
"remove_local": "true",
},
import os
from airflow.hooks.S3_hook import S3Hook
def _local_to_s3(
bucket_name: str, key: str, file_name: str, remove_local: bool = False
) -> None:
s3 = S3Hook()
s3.load_file(filename=file_name, bucket_name=bucket_name, replace=True, key=key)
if remove_local:
if os.path.isfile(file_name):
os.remove(file_name)
- 다른 AWS 리소스의 데이터에 액세스하는 작업의 경우 클러스터에 사용자를 대신해 리소스와 리소스의 데이터에 액세스할 권한이 필요합니다. AWS Identity and Access Management(IAM)을 사용하여 그러한 권한을 제공합니다.
Create IAM role
- IAM role을 생성합니다. 먼저 sts:AssumeRole 이라는 임시 자격 증명을 받을 수 있는 정책을 생성합니다.
echo '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}' > ./Test-Role-Trust-Policy.json
aws iam create-role --role-name Test-Role --assume-role-policy-document file://Test-Role-Trust-Policy.json
attach role
- COPY를 사용하여 Amazon S3에 액세스하려면 AmazonS3ReadOnlyAccess를 추가해야 합니다.
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name Test-Role
list IAM role
- IAM role을 보여줍니다.
aws iam list-roles
delete IAM role
- 먼저 지정된 역할을 제거해야 합니다. 그 후에 IAM role을 삭제할 수 있습니다.
aws iam detach-role-policy --role-name Test-Role --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
aws iam delete-role --role-name Test-Role
Amazon Redshift
- Amazon Redshift는 AWS 클라우드에서 완벽하게 관리되는 페타바이트급 데이터 웨어하우스 서비스입니다. Amazon Redshift 데이터 웨어하우스는 노드라는 컴퓨팅 리소스의 모음으로, 노드는 클러스터라는 그룹을 구성합니다. 각 클러스터는 Amazon Redshift 엔진을 실행하며, 하나 이상의 데이터베이스를 포함합니다.
Create Redshift
- redshift cluster를 생성합니다.
aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster
list Redshift
- 생성된 redshift cluster를 보여줍니다.
aws redshift describe-clusters
delete Redshift
- redshift cluster를 삭제합니다.
aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot
upload redshift
- 테이블을 생성한 뒤 copy 명령어를 통해 S3의 데이터를 redshift에 넣는다.
create table {tablename} (
col1 datatype,
col2 datatype
);
op_kwargs={
"qry": "copy public.tablename "
+ " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
+ " iam_role '" + IAM_ROLE + "'"
+ " csv "
+ " ignoreheader 1 ",
},
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
def _s3_to_redshift(qry: str) -> None:
rs_hook = PostgresHook(postgres_conn_id="redshift")
rs_conn = rs_hook.get_conn()
rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
rs_cursor = rs_conn.cursor()
rs_cursor.execute(qry)
rs_cursor.close()
rs_conn.commit()
Reference
aws redshift create-cluster --node-type dc2.large --number-of-nodes 2 --master-username adminuser --master-user-password TopSecret1 --cluster-identifier mycluster
aws redshift describe-clusters
aws redshift delete-cluster --cluster-identifier mycluster --final-cluster-snapshot-identifier myfinalsnapshot
create table {tablename} (
col1 datatype,
col2 datatype
);
op_kwargs={
"qry": "copy public.tablename "
+ " from 's3://" + BUCKET_NAME + "{filepath_filename}'"
+ " iam_role '" + IAM_ROLE + "'"
+ " csv "
+ " ignoreheader 1 ",
},
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
def _s3_to_redshift(qry: str) -> None:
rs_hook = PostgresHook(postgres_conn_id="redshift")
rs_conn = rs_hook.get_conn()
rs_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
rs_cursor = rs_conn.cursor()
rs_cursor.execute(qry)
rs_cursor.close()
rs_conn.commit()
S3 CLI List
boto3 Upload file
Airflow docker compose
RedShift CLI List
AWS IAM 생성하기
Author And Source
이 문제에 관하여(Simple Batch Pipeline), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@kero88/Simple-Batch-Pipeline저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)