데이터 공학 101: 첫 번째 데이터 추출 자동화

17244 단어 pythonsqldatabase
사진은 Dennis Kummer 에서 촬영되었다.
지난 몇 주 동안 우리는 Unsplash 세계의 몇 가지 중요한 화제를 토론했다.
우리는 이미 이해data engineering and automation를 위해 기초를 다졌다.이제 첫 번째 그룹 일괄 처리 작업을 구축할 때다.
이를 위해, 우리는 vocabulary and basic concepts a data engineer uses 라이브러리를 사용하여 우리의 업무를 자동화하는 것을 도울 것이다.
그 밖에 우리의 실시간 데이터 원본에 대해 우리는 Apache Airflow 데이터 집합을 사용할 것이다.언제든지 이러한 정보를 추출하고 311건의 보고서에 대한 최신 데이터를 얻을 수 있습니다.이것은 통상적으로 공공물을 파괴하고 주차를 위반하는 행위와 관련된다.
이 파이프에 대해 우리는 먼저 데이터를 원시 CSV로 추출한 다음에 MySQL 데이터베이스에 불러올 것이다.
본고에서, 우리는 이 조작을 실행하는 데 필요한 코드를 개술하고, 우리가 채택하고 있는 각종 절차의 일부 원인을 지적할 것이다.

sfgov 311 JSON 추출 함수 만들기


시작하기 전에 본고에서 논의한 모든 절차를 따를 수 있도록 기류 환경을 설정해야 합니다.만약 당신이 아직 이렇게 하지 않았다면, 우리는 이것을 찾을 것이다 .
데이터 파이프를 만들 때, 특히 일괄 처리를 위한 파이프는 데이터를 원시 데이터 층으로 추출하는 것이 유익하다.이렇게 하면 원시 데이터를 백업할 수 있다.
데이터에 오류가 있을 때, 원본 파일을 사용하면 데이터 오류가 원본 코드에 있는지 전체 과정에 있는지 확인하는 데 도움을 줄 수 있습니다.
우리의 예에서, 우리는 온라인의 JSON 데이터에서 데이터를 집중적으로 추출할 것이다.우리는 [read_json](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html)라는 판다스 함수를 사용하여 이 점을 실현할 수 있다.이것은 파일이나 URL을 읽을 수 있습니다.
우리는 호출할 수 있는 함수를 만들어서 이 함수를 실현할 것이다. 이 함수는 URL이나 파일에서 JSON 기반 데이터를 추출할 것이다.
우리는 또한 파일 이름에 시간 스탬프를 추가해서 우리가 언제 데이터를 추출할지 알 수 있도록 할 수 있다.이것은 나중에 데이터의 모든 변경 사항을 추적하려고 시도할 때 사용할 수 있으며, 이러한 변경은 데이터를 업데이트하는 것이 아니라 스냅샷과 같을 수도 있다.
다음과 같이 DateTime 객체를 사용하여 수행할 수 있습니다.
이 파일 이름은 나중에 아래에서 만든 추출 함수와 함께 사용됩니다.

# Step 1: Import all necessary packages.

# For scheduling
import datetime as dt

# For function jsonToCsv
import pandas as pd

# For function csvToSql
import csv
import pymysql

# Backwards compatibility of pymysql to mysqldb
pymysql.install_as_MySQLdb()

# Importing MySQLdb now
import MySQLdb

# For Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


# Step 2: Define functions for operators.

# A JSON string reader to .csv writer function.
def jsonToCsv(url, outputcsv):

    # Reads the JSON string into a pandas DataFrame object.
    data = pd.read_json(url)

    # Convert the object to a .csv file.
    # It is unnecessary to separate the JSON reading and the .csv writing.
    data.to_csv(outputcsv)

    return 'Read JSON and written to .csv'
이 임무는 여전히 실시해야 한다.그러나 불러오는 함수를 설정하면 마지막으로 보여 드리겠습니다.

글은 우리 개인의 가장 사랑 중의 하나가 될 것이다 Airflow를 사용하여 MySQL에 데이터 로드


일단 발췌가 생기면, 다음 단계는 데이터를 데이터 창고의 어떤 원시층에 불러오는 것이다.
이 단계의 관건은 데이터를 조작하지 않는 것이다.데이터가 데이터 원본에서 나오는 어떤 형식의 데이터 문제가 존재하면 트레이스하기 쉽기 때문이다.
모든 단계에서 데이터 품질 검사를 통해 이 점을 실현할 수 있습니다.원본 검사에서, 데이터 형식이 의미가 있는지 검사합니다.
예를 들어 모든 날짜 필드가 날짜입니까?모든 상태가 유효한 상태인가요?믿든 안 믿든 네가 결정해라. 우리는 이곳에서 문제에 부딪혔다.WE는 주의 약자가 아닙니다.
이것은 당신이 추출한 데이터가 정확하다는 것을 확보하기 위해 건전한 검사입니다.
이론적으로 응용 프로그램은 반드시 사용자의 입력을 자세하게 검사해야 한다.그러나 우리는 응용층을 믿지 않는다.
이외에 아래의 코드를 사용할 수 있습니다.먼저 MySQL을 사용하여 데이터베이스 연결을 구축한 다음 CSV를 한 줄씩 로드합니다.
def csvToSql():

    # Attempt connection to a database
    try:
        dbconnect = MySQLdb.connect(
                host='localhost',
                user='root',
                passwd='databasepwd',
                db='mydb'
                )
    except:
        print('Can\'t connect.')

    # Define a cursor iterator object to function and to traverse the database.
    cursor = dbconnect.cursor()
    # Open and read from the .csv file
    with open('./rogoben.csv') as csv_file:

        # Assign the .csv data that will be iterated by the cursor.
        csv_data = csv.reader(csv_file)

        # Insert data using SQL statements and Python
        for row in csv_data:
            cursor.execute(
            'INSERT INTO rogobenDB3(number, docusignid, publicurl, filingtype, \
                    cityagencyname, cityagencycontactname, \
                    cityagencycontacttelephone, cityagencycontactemail, \
                    bidrfpnumber, natureofcontract, datesigned, comments, \
                    filenumber, originalfilingdate, amendmentdescription, \
                    additionalnamesrequired, signername, signertitle) ' \
                    'VALUES("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", \
                    "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s")',
                    row
                    )

    # Commit the changes
    dbconnect.commit()

    '''
    # Print all rows - FOR DEBUGGING ONLY
    cursor.execute("SELECT * FROM rogobenDB3")
    rows = cursor.fetchall()

    print(cursor.rowcount)
    for row in rows:
        print(row)
    '''

    # Close the connection
    cursor.close()

    # Confirm completion
    return 'Read .csv and written to the MySQL database'
더 튼튼한 시스템을 구축하고 있다면, 가져온 연결 문자열만 받아들일 수 있는 데이터베이스 관리자 클래스를 설정할 수 있습니다.
그러나 우리는 단지 시범을 위해 그것을 구축했기 때문에 코드를 모두 함수에 넣었다.
이 모든 기능을 설정하면 DAG를 정식으로 설정할 수 있습니다. , DAG의 역할은 프로세스 맵과 유사합니다.그것은 어떤 임무가 먼저 실행되고 어떤 임무가 다른 임무에 의존하는지 지도한다.
우리의 예에서, 우리는 데이터를 추출하는 임무가 있고, 다른 임무는 상기 데이터를 MySQL 테이블에 불러옵니다.이 두 기본 작업은 파이프를 시작하는 데 도움이 될 것입니다. 아래와 같습니다.

내가 이전 글에서 언급한 바와 같다 기류 파이프 설정


현재 이 모든 기능이 있어서 우리는 파이프를 설치할 수 있다.
기류에서 실제 파이프를 설정하려면 기본 매개변수 세트를 설정해야 합니다.소유자, 시작 날짜, 파이프 재시도 빈도 및 기타 매개변수를 설정할 수 있습니다.
# Step 3: Define the DAG, i.e. the workflow

# DAG's arguments
default_args = {
        'owner': 'rogoben',
        'start_date':dt.datetime(2020, 4, 16, 11, 00, 00),
        'concurrency': 1,
        'retries': 0
        }

# DAG's operators, or bones of the workflow
with DAG('parsing_govt_data',
        catchup=False, # To skip any intervals we didn't run
        default_args=default_args,
        schedule_interval='* 1 * * * *', # 's m h d mo y'; set to run every minute.
        ) as dag:

    opr_json_to_csv = PythonOperator(
            task_id='json_to_csv',
            python_callable=jsonToCsv,
            op_kwargs={
                'url':'https://data.sfgov.org/resource/pv99-gzft.json',
                'outputcsv':'./rogoben.csv'
                }
            )

    opr_csv_to_sql = PythonOperator(
            task_id='csv_to_sql',
            python_callable=csvToSql
            )

# The actual workflow
opr_json_to_csv >> opr_csv_to_sql
매개변수 외에도 특정 연산자를 실제로 설정해야 합니다.이런 상황에서 우리는 두 가지 함수가 있다. jsonToCSVcsvToSql.이것들은 PythonOperator에 사용될 것이다.이것은 당신이 우리가 칭하는 임무를 만들 수 있도록 합니다.
작업이 합리적인 순서로 실행될 수 있도록 의존항을 정의해야 합니다.
비트 이동 연산자를 사용하여 종속성을 정의할 수 있습니다.위치 이동 연산자에 익숙하지 않은 사람에게는 >> 또는 << 처럼 보인다.
이 예에서는 를 opr_json_to_csv >> opr_csv_to_sql 로 정의할 수 있습니다.
이것은 opr_json_to_csv 이전에 운행하는 것을 확보했다.
사실대로 말하면, 너는 중복된 데이터를 불러오는 방식이 있을 것이다.
중복 데이터를 처리하기 위해 원본 레이어를 로드한 다음 나중에 임시 레이어에 중복 데이터가 로드되지 않도록 검사할 수 있습니다.그래서 우리는 지금 이 걱정을 하지 않는다.
이로써 당신은 기본적으로 첫 번째 파이프를 완성했습니다.

그럼 지금 파이프를 어디에 놓으세요?


이 파이프를 실행하려면 설정한 opr_csv_to_sql 폴더에 저장해야 합니다.아직 설정하지 않았다면 저희 를 사용해야 합니다.
airflow                  # airflow root directory.
├── dags                 # the dag root folder
│   ├── first_dag.py        # where you put your first task
이 파이프가 저장되면, 백그라운드에서 기류가 운행하기만 하면, 당신의 DAG는 기류에 의해 자동으로 픽업됩니다.
로컬 호스트 8080에 액세스하여 이를 확인할 수 있습니다. 기본적으로 Airflow 대시보드는 여기서 실행됩니다.
거기서 당신의 DAG가 나와야 합니다.
일단 그것이 나타나면, 모든 작업이 설정되어 있는지 확인하기 위해 당신의 DAG를 볼 수 있습니다.
다음 그림과 같이 표시됩니다.
favorite Airflow setup guide

이제 파이프가 준비되었습니다.

첫 번째 데이터 파이프라인 완료


첫 번째 기류 데이터 파이프라인을 구축하고 자동화한 것을 축하합니다!이제 이 프레임워크를 사용하여 다른 ETL과 데이터 파이프라인에서 사용할 수 있습니다.
물론 데이터 플랫폼이나 데이터 창고의 1층일 뿐이다.여기서부터 생산층, 도량층, 그리고 특정한 데이터 시각화나 데이터 과학층을 만들어야 합니다.
그리고 너는 진정으로 너의 데이터에 영향을 미치기 시작할 수 있다.
데이터 과학, 클라우드 컴퓨팅, 기술에 관한 더 많은 글을 읽고 싶으시면 아래의 글을 보십시오!

Data Engineering 101: Writing Your First Pipeline
5 Great Libraries To Manage Big Data
What Are The Different Kinds Of Cloud Computing
4 Simple Python Ideas To Automate Your Workflow
4 Must Have Skills For Data Scientists
SQL Best Practices --- Designing An ETL Video
5 Great Libraries To Manage Big Data With Python

좋은 웹페이지 즐겨찾기