Flink Socket 데이터 실시간 처리
Apache Flink는 분산 데이터 흐름 처리와 대량 데이터 처리를 위한 소스 컴퓨팅 플랫폼으로 흐름 처리와 대량 처리 두 가지 유형의 응용을 지원하는 기능을 제공합니다. Flink는 Socket 데이터를 실시간으로 처리합니다.
Flink Socket 소스 GitHub
Maven Archetype을 통해 프로젝트 만들기
프로젝트 만들기
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.9.0
상기 Maven 명령을 통해 프로젝트를 만드는 과정에서 명령은 사용자가 프로젝트의 그룹 Id,artifactId,version,package 등 정보를 상호작용적으로 정의하고 일부 옵션은 기본값이 있으면 바로 차로 돌아가면 된다.그림에서 프로젝트를 성공적으로 만들면 클라이언트가 알릴 것입니다.
그룹 Id,artifactId 정보를 각각 다음과 같이 지정하고 나머지 매개 변수는 기본값을 사용합니다
항목 검사
Maven을 사용하여 만든 프로젝트의 경우 다음과 같은 프로젝트 구조를 볼 수 있습니다.
위의 프로젝트 구조를 통해 알 수 있듯이 이 프로젝트는 Scala 코드의 프로젝트로 각각BatchJob이다.java 및 StreamingJob.자바 두 파일은 각각 Flink 대량 인터페이스 DataSet의 실례 코드와 흐르는 인터페이스의 실례 코드를 이겼다.
IDE로 항목 가져오기
Flink 홈페이지는 이러한 절차를 거쳐 프로젝트를 만든 후 Intellij IDEA를 사용하여 후속 프로젝트 개발을 진행하는 것을 추천합니다.
프로젝트 컴파일
프로젝트는 위의 절차를 거쳐 만들어진 후 Maven Command 명령 mvn clean package를 사용하여 프로젝트를 컴파일할 수 있으며, 컴파일링이 완료되면 프로젝트 동급 디렉터리에 target/-를 생성할 수 있습니다.jar 파일, 이jar 파일은 웹 클라이언트를 통해 그룹에 제출하여 실행할 수 있습니다.
개발 환경 구성
여기서는 홈페이지에서 추천하는 IntelliJ IDEA를 응용프로그램 개발 IDE로 사용합니다.
IntelliJ IDEA 다운로드
IntelliJ IDEA 공식 주소를 사용하여 설치 프로그램을 다운로드하고 운영 체제에 따라 해당 패키지를 선택하여 설치할 수 있습니다.
Scala Plugins 설치
IntelliJ IDEA를 설치하면 기본적으로 Scala 개발 환경이 지원되지 않으므로 Scala 플러그인을 설치해야 합니다.다음은 IDEA에서 Scala 플러그인 설치에 대한 설명입니다.IDEA IDE를 연 후 IntelliJ IDEA 메뉴 표시줄에서 Preferences 옵션을 선택하고 Plugins 하위 옵션을 선택한 다음 페이지에서 Marketplace를 선택하고 검색 상자에 Scala를 입력하여 검색하십시오 검색된 옵션 목록에서 Scala 플러그인을 선택하고 설치하십시오
설치 후 IDE를 다시 시작하면 Scala 프로그래밍 환경이 적용됩니다.
Flink 프로젝트 가져오기
IntelliJ IDEA를 시작하고 Import Project를 선택한 다음 파일 옵션 상자에서 작성된 항목을 선택하고 확인을 클릭합니다.[가져오기 프로젝트에서 Import project from external mode의 Maven 후속 옵션을 선택하면 기본값을 사용합니다.
Flink Socket 애플리케이션
Flink Socket 애플리케이션 코드 작성
import org.apache.flink.streaming.api.scala._
object StreamingJob {
def main(args: Array[String]) {
//
val env = StreamExecutionEnvironment.getExecutionEnvironment
// , socket
val socketStream = env.socketTextStream("localhost", 9000, '
')
//
val count = socketStream
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
//
count.print()
//
env.execute("Socket Stream")
}
}
IDE에서 코드 테스트
코드 파일에서 프로그램을 오른쪽 단추로 실행합니다
이 때 다음과 같은 오류를 보고합니다
IDEA의 Run/Debug Configuration에서 "Include dependencies with"Provided"scope"옵션을 선택해야 할 때 로컬 IDE에서 실행할 수 있습니다
로컬에서 코드 테스트
우선 명령줄에서 현재 터미널에서 감청 포트 9000을 열고 명령줄에서 아래 명령을 실행합니다
nc -l 9000
그리고 IDE에서 StreamingJob 클래스의main 방법을 오른쪽 단추로 실행하면 다음과 같습니다
웹 클라이언트에서 Job 실행
먼저 프로젝트가 있는 디렉터리에서 mvn clean package를 실행하여 패키지를 만들고 프로젝트의 target 디렉터리에 flink-socket-1.0-SNAPSHOT를 생성합니다.jar 파일은 명령줄에 있습니다. 현재 터미널에서 감청 포트 9000을 엽니다. 명령줄에서 아래 명령을 실행합니다.
nc -l 9000
브라우저에서 Flink 웹 모니터링 페이지를 열고 왼쪽에서 Submit New Job 옵션을 선택하고 오른쪽 상단에 있는 Add New를 누르면 우리가 컴파일한 flink-socket-1.0-SNAPSHOT를 선택합니다.jar 파일, Submit 버튼을 클릭하여 Job 제출
Task Managers 선택 목록에서 해당 Job을 선택하고 Stdout 옵션을 클릭하여 결과를 봅니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.