Docker의 Apache Flink SQL 클라이언트

Apache Flink는 흐름식과 일괄 처리 모드에서의 데이터 처리에 사용되는 소스 프레임워크이다.ApacheKafka와 모든 JDBC 데이터베이스를 포함한 다양한 데이터 플랫폼을 지원합니다.Flink의 초능력은 여러 언어에서 나온다. 더 전통적인 자바와 Scala에서 파이톤까지.파이톤의 경우 pyFlink 최고의 머신러닝 라이브러리와 함께 사용할 수 있다.
그러나 데이터 세계에서 가장 흔히 볼 수 있는 언어 중 하나는 SQL이다.데이터 분석가에서 과학자와 엔지니어에 이르기까지 SQL은 일반적으로 모든 데이터 종사자 패키지의 일부분이다.SQL은 데이터 액세스와 조작에 기술과 트렌드를 뛰어넘는 추상적인 기능을 제공합니다.밑바닥 기술이 무엇이든지 간에 그것으로 당신의 데이터를 조회할 수 있다.
ApacheFlink를 사용하면 itsSQL Client를 사용하여 순수 SQL에서 전체 데이터 파이프라인을 정의할 수 있습니다.이 블로그는 SQL 클라이언트를 포함한 docker 기반의 로컬 Apache Flink 플랫폼을 소개합니다.

Docker Compose를 사용하여 로컬에서 Apache Flink 설정


위에서 말한 바와 같이 Apache Flink는 매우 재미있는 기술로 시도해 볼 만하다.새로운 도구를 평가할 때, 로컬에서 실행하는 것은 양날의 검이다.한편, 너는 그것의 내재된 메커니즘을 더욱 잘 이해할 수 있다.다른 한편, 당신도 설치 단계의 고통을 겪을 것이다.
설정의 고통을 건너뛰려면 Docker를 시도하십시오.이 도구는 모든 게스트 운영체제에서 쉽게 이식할 수 있도록 미리 포장된 해결 방안을 제공합니다.
내가 Flink의 SQL 클라이언트의 내용을 사방에서 찾았을 때 a demo on Apache Flink's website 이것은 매우 좋지만 임의의 실험에 적합하지 않다는 것을 발견했다.이것이 바로 내가 더욱 가벼운 강좌를 만든 이유이다.본고와 related Github repository는 최소한의 구축 블록을 사용하고 기본적인 Flink 기능을 제공하여 용기 이외에 어떠한 데이터 파이프라인이나 목표를 사용할 수 있기를 희망한다.부팅 전에 dockerdocker-compose가 설치되어 있는지 확인합니다.
전체 코드는 aiven/flink-sql-cli-docker 저장소에 포함되어 있으며 터미널에서 다음 명령을 사용하여 저장소를 복제할 수 있습니다.
git clone https://github.com/aiven/flink-sql-cli-docker.git
이제 flink-sql-cli-docker 폴더를 열고 docker compose를 시작합니다.
cd flink-sql-cli-docker
docker-compose up -d
이것은 백그라운드에서 3개의 Apache Flink 노드를 시작합니다. ajobmanager, ataskmanagersql-client입니다.클러스터에 대한 자세한 내용은 다음과 같습니다.
docker-compose ps
이것은 세 컨테이너가 Up 상태임을 나타냅니다.
Name                             Command               State                Ports              
-------------------------------------------------------------------------------------------------------------------
flink-sql-cli-docker_jobmanager_1    /docker-entrypoint.sh jobm ...   Up      6123/tcp, 0.0.0.0:8081->8081/tcp
flink-sql-cli-docker_sql-client_1    /docker-entrypoint.sh            Up      6123/tcp, 8081/tcp              
flink-sql-cli-docker_taskmanager_1   /docker-entrypoint.sh task ...   Up      6123/tcp, 8081/tcp  
Flink의 웹 사용자 인터페이스는 현재 localhost:8081에서 얻을 수 있습니다.이것은 Flink 상태와 우리가 만들 데이터 파이프에 대한 정보를 탐색하는 데 유용한 도구입니다.

Docker Compose 설정에 대한 설명

docker-compose.yml에서 우리는 settings 하위 폴더를 jobmanagerdocker 용기/settings 폴더에 비추었다.이러한 방법으로 설정 파일을 호스트와 게스트 사이에 전달할 수 있으며 인증에 특정 호스트에서 생성된 파일(예를 들어 키 라이브러리)이 필요하다면 매우 유용할 것이다.data 하위 폴더도 taskmanagerjobmanager 용기에 비친다.이것은 순전히 내가 아래의 SQL 예시를 제공해야 하기 때문이지만, 로컬 파일 시스템의 파일을 대상으로 Apache Flink 행위를 테스트하고자 하는 상황에서 유용할 수 있습니다.data 하위 폴더에는 Apache Flink 테스트에 사용할 가상 데이터가 포함된 test.csv 파일이 있습니다.

SQL의 역량 강화


Flink의 SQL을 사용하려면 sql-client 컨테이너를 입력해야 합니다.터미널에서 다음 명령을 실행할 수 있습니다.
docker exec -it flink-sql-cli-docker_sql-client_1 /bin/bash
지금 우리가 들어갔으니 우리는 쓸 수 있다
./sql-client.sh
우리 도착했어!우리는 다양한 데이터 원본과 목표에 연결된 데이터 파이프라인을 만들 수 있는 기능이 완비된 SQL 클라이언트를 가지고 있다.SQL 클라이언트에서 연관된 Flink 테이블을 정의하여 test.csv 폴더의 flink-sql-cli-docker/data 파일을 조회할 수 있습니다.
create table
  people_job (
    id INT,
    name STRING,
    job STRING,
    salary BIGINT
  )
  WITH (
    'connector' = 'filesystem',
    'path' = 'file:///data/test.csv',
    'format' = 'csv',
    'csv.ignore-parse-errors' = 'true'
    );
우리는 표를 조회할 수 있다.
select * from people_job;
결과는 다음과 같습니다.
+/-                        id                      name                       job                    salary
  +                         1                       Ugo           Football Player                    200000
  +                         2                     Carlo    Crocodile domesticator                     30000
  +                         3                     Maria         Software Engineer                    210000
  +                         4                    Sandro               UX Designer                     70000
  +                         5                   Melissa         Software Engineer                     95000
Flink의 테이블 뷰를 종료하려면 Q를 누릅니다.

데이터 대상 만들기: PostgreSQL


업무 통계에 따른 평균 임금과 인원 총수를 PostgreSQL표에 전달하고 싶다고 가정해 봅시다.PostgreSQL 인스턴스가 없는 경우 다음Aiven's CLI 명령을 사용하여 빠르게 생성하고 새 터미널 창을 열 수 있습니다.
avn service create pg-flink   \
  -t pg                       \
  --cloud google-europe-west3 \
  --plan startup-4
이것은 -t pggoogle-europe-west3 계획이 있는 PostgreSQL 실례startup-4를 만들 것이다.서비스 준비가 완료될 때까지 기다리겠습니다.
avn service wait pg-flink
Flink에서 데이터를 푸시하는 데 사용할 타겟 테이블job_details을 만듭니다.같은 터미널 창에서 다음을 수행할 수 있습니다.
avn service cli pg-flink
그리고 나서
create table job_summary (
  job VARCHAR PRIMARY KEY,
  avg_salary BIGINT,
  nr_people BIGINT
  );

SQL 파이핑 생성


이제 새 터미널 창에서 다음 명령을 사용하여 PostgreSQL의 연결 매개변수를 읽어들입니다.
avn service get pg-flink \
  --format '{service_uri_params}'
출력은 다음과 유사해야 합니다.
{
  'dbname': 'defaultdb',
  'host': '<hostname>.aivencloud.com',
  'password': '<password>',
  'port': '13039',
  'sslmode': 'require',
  'user': 'avnadmin'
}
위의 host, port, user, dbnamepassword에 대한 상세한 정보를 주의하고 PostgreSQL을 가리키는 Flink표를 작성하세요.다음 SQL을 Flink의 SQL Cli에 붙여넣습니다.
create table
  job_summary_flink(
    job STRING,
    avg_salary BIGINT,
    nr_people BIGINT,
    PRIMARY KEY (job) NOT ENFORCED
  )
  WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://<host>:<port>/<dbname>?sslmode=require',
    'table-name' = 'job_summary',
    'username' = '<username>',
    'password' = '<password>'
    );
이제 Flink SQL Client에서 다음 명령을 사용하여 SQL 파이핑을 만듭니다.
insert into job_summary_flink
  select job,
    avg(salary),
    count(*)
  from people_job
  group by job;
다음과 같이 SQL 클라이언트의 출력을 확인해야 합니다.
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: b2d8b019c6c6e3dc5fe63902a14c13a9
이제 Flink의 테이블이 Flink의 CLI에서 다음 SQL을 올바르게 채우고 있는지 확인할 수 있습니다.
select * from job_summary_flink;
결실
+/-                       job                avg_salary                 nr_people
  +               UX Designer                     70000                         1
  +    Crocodile domesticator                     30000                         1
  +           Football Player                    200000                         1
  +         Software Engineer                    152500                         2
또한 PostgreSQL 클라이언트 창에서 다음 명령을 사용하여 PostgreSQL에서 동일한 결과를 얻을 수 있습니다.
select * from job_summary;
결실
         job           | avg_salary | nr_people
-----------------------------+------------+-----------
UX Designer            |      70000 |         1
Crocodile domesticator |      30000 |         1
Football Player        |     200000 |         1
Software Engineer      |     152500 |         2
(4 rows)

마무리


이 블로그는 Apache Flink의 SQL 클라이언트를 Docker 용기 그룹으로 만드는 방법을 제공합니다.컨테이너 외부에 추가 데이터 소스나 목표를 제공하여 Flink에 들어가는 학습 여정을 시작할 수 있도록 합니다.ql 파이프 예시에서 로컬 csv 파일과 PostgreSQL 사이의 통합을 보여 줍니다.이 글을 보면 앞으로 Apache Flink와 다른 Aiven이 관리하는 데이터 구조를 사용하는 파이프라인 예시를 더 많이 발표할 것입니다!
추가 자료:
  • Apache Flink Home page
  • Apache Flink SQL Client
  • Apache Flink SQL Client Demo
  • Apache Flink Docker GitHub Repository

  • Aiven.io 선택한 클라우드에 소스 데이터 플랫폼을 만들 수 있음
  • Apache Flink에 대한 자세한 내용과 기타 Aiven 데이터 서비스에 계속해서 관심을 가져 주십시오.

    좋은 웹페이지 즐겨찾기