ETL com Apache Airflow、Web Scraping、AWS S3、Apache Spark e Redshift | Parte 1

irei 발표 후 아파치 기류의 이용률, 유량 제어, 추가 유량 자동 제어, 유량 추출, 전환과 탑재(ETL).프로젝트 실시 과정에서 환경 보호 부문은 공공 부문과 일치해야 한다.
기류를 이용하는 과정에서 가장 중요한 것은 통풍을 실현하는 것이다.사이트 구축AdoroCinema은 아름다운 사이트로 영화 사이트의 추가 정보를 제공한다.csv e enviar para o AWS S3.프로젝트의 일부에서 iremos Realizer는 통합된 기류 구성 요소이고 Apache Spark는 실현 과정이다. 이것은 Arquivos회사가 부러워하는 AWS S3와armazenaremos, 아마존 Redshift회사의 과정이다.
O código do projeto estádisponívelaqui.

공기 흐름


Aibnb 개발 프로젝트의 국제 환경 보호 설비 프로젝트 중 해결 방안과 문제를 해결해야 하는 프로젝트가 하나 있지만 환경 보호가 없다.O는 2014년 마지막 공기 흐름 조사no blog da Airbnb를 2015년 완료했으며, 이 조사의 목적지는 다도스의 엥그나리아 시가 아닌 톨노의 유행 지역이다.직능 지령 수첩에 따르면 이 프로젝트의 진도와 응답 수준은 프로젝트의 진도와 진도Apache Airflow에 달려 있다.

프로젝트 전용 구성


이 분야에서 우리의 목표는 다음과 같다.
  • docker e docker compose em sua máquina.
  • .
  • aws cli instalado em sua máquina.
  • 내가 뭘 할 수 있겠어.
  • docker utilizada foi Apuckel/docker-airflow의 사진은 아름다운 사진으로 명하 마쿠이나(minha máquina)의 사진이다.
    AWS 유틸리티의 크레덴시아스 구성으로 컨테이너 부두 노동자들은 아퀴보 현지에서 일한다.우리는 너의 명령을 집행할 시간도, 광희도 없다.
    $ aws configure
    

    Fluxo a ser implementado


    O 프로젝트는 두 단계를 포함하고 첫 번째 단계의 주요 공공시설은 순류한다.

    웹 캡처 웹 사이트 BeautifulSoup


    인터넷 형식의 자동 발표 정보 획득 과정.실현 과정에서 정확한 예측은 탐색적인 네트워크이고 중요한 과정에서 HTML 평균치는 건설적인 네트워크이다.기능화 개발 도구는 어떤 탐색도 필요 없고 HTML로 정보를 얻을 필요도 없는 실용적인 개발 도구를 제공한다.

    이런 정보는 오투로, 고넬로, 포토도 포스터, 중국 부흥, 란멘토 데이터, 중국 스포츠 종합 보도와 영화에 대한 대중의 관심을 포함한다.이것은 공공 관계에 관한 중흥 프로젝트이자 중흥 프로젝트이다.나비가 폴(vezes navegar por)이 몬테나주의 대응 조치를 연결했을 때 그는 복잡한 형식인 공공 서비스와 공공 서비스에서 유래한 정보의 출처를 발견했다.
    이 세 편의 주요 내용은 아바리아 영화, 레토레스 영화와 알칸 영화이다.파젤의 전략에 따르면 HTML 구성 요소를 선택하거나 EU 회원국을 분류하지 않아도 미래 정보의 완전성과 데이터 정리의 실용성을 확보할 수 있다.
    from bs4 import BeautifulSoup
    from urllib.request import urlopen, Request
    
    def scraping_avaliacao_adoro_cinema(page = 1):
        URL = f"http://www.adorocinema.com/filmes/todos-filmes/notas-espectadores/?page={page}"
    
        html_doc = urlopen(URL).read()
        soup = BeautifulSoup(html_doc, "html.parser")
        data = []
    
        for dataBox in soup.find_all("div", class_="data_box"):
            titleObj = dataBox.find("a", class_="no_underline")
            imgObj = dataBox.find(class_="img_side_content").find_all(class_="acLnk")[0]
            sinopseObj = dataBox.find("div", class_="content").find_all("p")[0]
            dateObj = dataBox.find("div", class_="content").find("div", class_="oflow_a")
            movieLinkObj = dataBox.find(class_="img_side_content").find_all("a")[0]
            generoObj = dataBox.find("div", class_="content").find_all('li')[3].find('div',class_="oflow_a")
            detailsLink = 'http://www.adorocinema.com' + movieLinkObj.attrs['href']
    
            avaliacoesMeios = dataBox.find("div", class_="margin_10v").find_all('span', class_="acLnk")
            avaliacoesNotas = dataBox.find("div", class_="margin_10v").find_all('span', class_="note")
    
            # tratar a lista de avaliações
            avaliadores = [ elem.text.strip() for elem in avaliacoesMeios if elem.text.strip() != "" ]
    
            # vincular com notas
            avaliacoesValores = {"AdoroCinema" : None, "Leitores" : None, "Imprensa" : None}
            for index, avaliador in enumerate(avaliadores):
              avaliacoesValores[avaliador] = avaliacoesNotas[index].text.strip()     
    
            #Carregar a sinopse completa 
            htmldocMovieDetail = urlopen(detailsLink).read()
            soupMovieDetail = BeautifulSoup(htmldocMovieDetail, "html.parser")
            fullSinopse = soupMovieDetail.find(class_="content-txt")     
            fullImgObj = soupMovieDetail.find("meta",  property="og:image")   
    
            data.append({'titulo': titleObj.text.strip(),
                        'genero': generoObj.text.replace('\n','').strip(),
                        'poster' : fullImgObj["content"], 
                        'sinopse' : sinopseObj.text.strip(),
                        'data' :  dateObj.text[0:11].strip(),
                        'link' : detailsLink,
                        'avaliacoes' : avaliacoesValores,
                        'sinopseFull': fullSinopse.text})
    
        return data
    
    Aqui 예술종합회의는 미의 실용성을 설명했다.

    다그스


    O 기류 이용률 uma estrutura chamada DAG - 유방향 무환도, 즉 관계에 따라 확정된 일련의 집행 방안의 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향 무방향기존의 설정 시스템은 설정 프로그램, 실행 프로그램의 운행 조건, 복구 속도 등을 포함한다.
    기본 Aifrflow ID는 Dag do projeto como todo arquivo python armazenado na Intent Dag이며 한 지방 파일에서 식별할 수 있도록 구성된 파일입니다.
    크리안도 우마다그
    부록 e에 근거하여 중정의 위치를 확정하였다.
  • dag_id:éo identicadorúnico do fluxo,aqui chamamos de'etl_movie_review'.
  • 계획 간격: 작업 수행 주기를 나타냅니다.
  • 기본 매개 변수: s ã o informa çes que ser ã o refletidos em todos os nós do DAGs.
  • from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    
    from etl_scripts.load_to_s3 import local_to_s3
    from etl_scripts.web_scraping import collect_adoro_cinema_data
    from etl_scripts.remove_local_file import remove_local_file
    
    default_args = {
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2021, 1, 1),
        "retries": 0,
    }
    
    with DAG(
        "etl_movie_review", 
        schedule_interval=timedelta(minutes=1),
        catchup=False,
        default_args=default_args
    ) as dag:
        (...)
    
    다DAG 의 임무로 Criando
    비구조화 기류 이용 업체, 전자 봉인 클래스, 실현 가능한 클래스와 확정 가능한 클래스. 비구조화 기류 이용 프로젝트는 파이톤 Operator와 Dummy Operator, código python Executuço de código python, fim de fluxos이다.alguns outros 회사의 직원, 운영자 명단aqui.
    (...)
       # Inicio do Pipeline
        start_of_data_pipeline = DummyOperator(task_id='start_of_data_pipeline', dag=dag)
    
        # Definindo a tarefa para realização de web scrapping
        movie_review_web_scraping_stage = PythonOperator(
            task_id='movie_review_web_scraping_stage',
            python_callable=collect_adoro_cinema_data,
            op_kwargs={
                'quantidade_paginas': 1,
            },
        )
    
        # definindo a tarefa para enviar o arquivo csv para S3.
        movie_review_to_s3_stage = PythonOperator(
            task_id='movie_review_to_s3_stage',
            python_callable=local_to_s3,
            op_kwargs={
                'bucket_name': '<nome-do-seu-bucket>'
            },
        )
    
        movie_review_remove_local = PythonOperator(
            task_id='movie_review_remove_local',
            python_callable=remove_local_file,
        )
    
        # Fim da Pipeline
        end_of_data_pipeline = DummyOperator(task_id='end_of_data_pipeline', dag=dag)
    
    예를 들어 nós temos nossa DAG pronta,ela possui 2 tarefas Dummy,ouseja apenas para representa ço de in fim do fluxo,e3 tarefas que executam functionalidades em código Python.신분 식별 과정에서 가장 중요한 것은 신분 식별 과정에서 공기가 흐르는 방향이 잘못된 정보라는 것이다.
    마지막으로, nós precisamos criar a ligaço entre as tarefinar a ordem de execu ã o avirate algumas de declaration ço dessa ordem, a esse tutorial vamos seguir o formato mais simples.
    start_of_data_pipeline >> movie_review_web_scraping_stage >> movie_review_to_s3_stage >> movie_review_remove_local >> end_of_data_pipeline
    
    Nas nossas tasks Python Operator, nós realizamos a chamada de 3 funççes python: 영화 데이터를 수집하고 로컬 파일을 삭제합니다.가장 좋은 소포 하나, 그물 스크레이퍼 하나, 벽 치마 하나, 정자 하나.csv numa Pota Temporaária,jáa segunda e a terceira são as funões responseáveis por integrar com o S3 e envirar os arquivos,e então remover o arquivos.나는 디스코를 좋아한다.
    import os
    import glob
    from airflow.hooks.S3_hook import S3Hook
    
    def local_to_s3(bucket_name, filepath='./dags/data/*.csv'):
        s3 = S3Hook()
    
        for f in glob.glob(filepath):
            key = 'movie_review/'+f.split('/')[-1]
            s3.load_file(filename=f, bucket_name=bucket_name,
                        replace=True, key=key)
    
    def remove_local_file(filepath='./dags/data/*.csv'):
        files = glob.glob(filepath)
        for f in files:
            os.remove(f)
    
    집행자는 다그 페로 페넬로
    토다스 명세서에 따르면 나텔라 이레모스는 행정장관이다. 그는 이 명세서에서 나아플리카토 행정장관인 아플리카토 보토토의 폐쇄를 확인했다.
    Nós podemos는 시각화와 검증 모델을 하나의 임무로 삼지 않았고 상호작용 모델을 집행 상태 조화의 핵심으로 삼지 않았으며 최종 집행 모델도 성공적인 집행 모델도 없었다.조작원 식별이 필요 없고 fluxo가 필요 없습니다.

    O 기류 분포는 시각적이고 유창한 실행 형식으로 진행되며 패턴 트리 보기가 아니라 실행 정보의 형식으로 관리됩니다. vocètmém acesso des execuè O cada tarefas, e acesso ao adigo da DAG,파넬 바스탄트(painel bastante)가 타피스 크리아다스(tafegas criadas)에 대한 감시를 마쳤다.

    fim 매뉴얼에서 마지막으로 실행된 것은arquivo입니다.csv geradocom은 영화 정보의 전파자로서 환경에 대한 숭배가 없다.

    세인트폴


    fim의 규정에 따라 우리는 nosso projeto Concluda에서 프로젝트를 완성하고 agora 사이트에 영화와 인터넷에 관한 정보를 발표할 것이다.예를 들어 nós iremos Lers esse arquivo do S3 e Realizer um pequeno Processmento dessmento deses dados usando Apache Spark는 아마존 레드 데이터 창고의 일부입니다.
    에스페로 터넘 고스타도(tenham gostado), 콰콰비다(qualquer d 분짜 pode deixar nos comentários).아테아 프로시마!

    좋은 웹페이지 즐겨찾기