Flink Socket 데이터 실시간 처리

3395 단어
블로그 시작 링크:https://mhuig.github.io/posts/fc610c2d.html
Apache Flink는 분산 데이터 흐름 처리와 대량 데이터 처리를 위한 소스 컴퓨팅 플랫폼으로 흐름 처리와 대량 처리 두 가지 유형의 응용을 지원하는 기능을 제공합니다. Flink는 Socket 데이터를 실시간으로 처리합니다.
Flink Socket 소스 GitHub
Maven Archetype을 통해 프로젝트 만들기
프로젝트 만들기
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.9.0

상기 Maven 명령을 통해 프로젝트를 만드는 과정에서 명령은 사용자가 프로젝트의 그룹 Id,artifactId,version,package 등 정보를 상호작용적으로 정의하고 일부 옵션은 기본값이 있으면 바로 차로 돌아가면 된다.그림에서 프로젝트를 성공적으로 만들면 클라이언트가 알릴 것입니다.
그룹 Id,artifactId 정보를 각각 다음과 같이 지정하고 나머지 매개 변수는 기본값을 사용합니다
  • groupId:com.qst
  • artifactId:flink-socket

  • 항목 검사
    Maven을 사용하여 만든 프로젝트의 경우 다음과 같은 프로젝트 구조를 볼 수 있습니다.
    위의 프로젝트 구조를 통해 알 수 있듯이 이 프로젝트는 Scala 코드의 프로젝트로 각각BatchJob이다.java 및 StreamingJob.자바 두 파일은 각각 Flink 대량 인터페이스 DataSet의 실례 코드와 흐르는 인터페이스의 실례 코드를 이겼다.
    IDE로 항목 가져오기
    Flink 홈페이지는 이러한 절차를 거쳐 프로젝트를 만든 후 Intellij IDEA를 사용하여 후속 프로젝트 개발을 진행하는 것을 추천합니다.
    프로젝트 컴파일
    프로젝트는 위의 절차를 거쳐 만들어진 후 Maven Command 명령 mvn clean package를 사용하여 프로젝트를 컴파일할 수 있으며, 컴파일링이 완료되면 프로젝트 동급 디렉터리에 target/-를 생성할 수 있습니다.jar 파일, 이jar 파일은 웹 클라이언트를 통해 그룹에 제출하여 실행할 수 있습니다.
    개발 환경 구성
    여기서는 홈페이지에서 추천하는 IntelliJ IDEA를 응용프로그램 개발 IDE로 사용합니다.
    IntelliJ IDEA 다운로드
    IntelliJ IDEA 공식 주소를 사용하여 설치 프로그램을 다운로드하고 운영 체제에 따라 해당 패키지를 선택하여 설치할 수 있습니다.
    Scala Plugins 설치
    IntelliJ IDEA를 설치하면 기본적으로 Scala 개발 환경이 지원되지 않으므로 Scala 플러그인을 설치해야 합니다.다음은 IDEA에서 Scala 플러그인 설치에 대한 설명입니다.IDEA IDE를 연 후 IntelliJ IDEA 메뉴 표시줄에서 Preferences 옵션을 선택하고 Plugins 하위 옵션을 선택한 다음 페이지에서 Marketplace를 선택하고 검색 상자에 Scala를 입력하여 검색하십시오 검색된 옵션 목록에서 Scala 플러그인을 선택하고 설치하십시오
     설치 후 IDE를 다시 시작하면 Scala 프로그래밍 환경이 적용됩니다.
    Flink 프로젝트 가져오기
    IntelliJ IDEA를 시작하고 Import Project를 선택한 다음 파일 옵션 상자에서 작성된 항목을 선택하고 확인을 클릭합니다.[가져오기 프로젝트에서 Import project from external mode의 Maven 후속 옵션을 선택하면 기본값을 사용합니다.
    Flink Socket 애플리케이션
    Flink Socket 애플리케이션 코드 작성
    import org.apache.flink.streaming.api.scala._
    
    object StreamingJob {
      def main(args: Array[String]) {
        //      
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //     ,  socket
        val socketStream = env.socketTextStream("localhost", 9000, '
    ') // val count = socketStream .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .sum(1) // count.print() // env.execute("Socket Stream") } }

    IDE에서 코드 테스트
    코드 파일에서 프로그램을 오른쪽 단추로 실행합니다
    이 때 다음과 같은 오류를 보고합니다
    IDEA의 Run/Debug Configuration에서 "Include dependencies with"Provided"scope"옵션을 선택해야 할 때 로컬 IDE에서 실행할 수 있습니다
    로컬에서 코드 테스트
    우선 명령줄에서 현재 터미널에서 감청 포트 9000을 열고 명령줄에서 아래 명령을 실행합니다
    nc -l 9000
    

    그리고 IDE에서 StreamingJob 클래스의main 방법을 오른쪽 단추로 실행하면 다음과 같습니다
    웹 클라이언트에서 Job 실행
    먼저 프로젝트가 있는 디렉터리에서 mvn clean package를 실행하여 패키지를 만들고 프로젝트의 target 디렉터리에 flink-socket-1.0-SNAPSHOT를 생성합니다.jar 파일은 명령줄에 있습니다. 현재 터미널에서 감청 포트 9000을 엽니다. 명령줄에서 아래 명령을 실행합니다.
    nc -l 9000
    

    브라우저에서 Flink 웹 모니터링 페이지를 열고 왼쪽에서 Submit New Job 옵션을 선택하고 오른쪽 상단에 있는 Add New를 누르면 우리가 컴파일한 flink-socket-1.0-SNAPSHOT를 선택합니다.jar 파일, Submit 버튼을 클릭하여 Job 제출
    Task Managers 선택 목록에서 해당 Job을 선택하고 Stdout 옵션을 클릭하여 결과를 봅니다.

    좋은 웹페이지 즐겨찾기