Spark Streaming 사용자 정의 데이터 원본 - 사용자 정의 입력 DStream 및 수신 기 구현
18175 단어 sparkSparkStreamingSocket
참고 문서: SparkStreaming 프로 그래 밍 가이드 (공식 문서)http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.html
본 고 는 코드 언어 Scala 를 실현 한다.
전체 절 차 는 다음 과 같은 몇 단계 로 나 뉜 다.
1. 사용자 정의 수신 기 구현 (receiver)
사용자 정의 수신 기 (receiver) 를 실현 하려 면 다음 과 같은 몇 가 지 를 주의해 야 한다.
1.1 Socket 소켓 에 해당 하 는 클 라 이언 트
사용자 정의 수신 기 (receiver) 는 사실 Socket 소켓 소켓 에 해당 하 는 클 라 이언 트 프로 그래 밍 으로 서버 의 특정 IP 와 포트 에서 보 낸 데 이 터 를 수신 하 는 데 사 용 됩 니 다. 이곳 의 IP 와 포트 는 특정한 Socket 서버 의 IP 와 포트 변화 에 따라 변화 해 야 합 니 다.
1.2 Receiver 클래스 를 계승 하여 onStart () 와 onStop () 방법 을 복사 합 니 다.
구체 적 인 패키지: org. apache. spark. streaming. receiver. Receiver
class CustomReceiver(host: String, port: Int, isTimeOut: Boolean, sec: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging{
매개 변수 형식: host: String 과 port: Int 를 포함해 야 합 니 다. ,그 중에서 host 는 데이터 원본 서버 의 IP, port 는 데이터 원본 서버 의 포트 번호 이 고 나머지 매개 변 수 는 선택 할 수 있 습 니 다. 그 중에서 isTimeOut 은 Boolean 형식 으로 시간 초과 설정 여 부 를 표시 합 니 다. true 는 시간 초과 설정, false 대 표 는 설정 하지 않 습 니 다. set 는 Int 형식 으로 시간 초과 형식 은 s 초 급 입 니 다.
복사 방법: 복사 가 필요 한 방법 은 두 가지 onStart () 와 onStop () 방법 이 있 습 니 다.코드 에서 onStart () 는 연결 을 통 해 데 이 터 를 받 는 방법 을 가 진 스 레 드 를 시작 합 니 다. 그 중에서 receiver () 방법 으로 데이터 원본 서버 에 연결 하여 서버 데 이 터 를 얻 고 Spark 에 푸 시 합 니 다.override def onStart(): Unit = { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } override def onStop(){ // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false }
onStop 은 코드 블록 을 실현 할 필요 가 없습니다.
1.3 방법 receive 수신 데이터 및 Spark 전송
onStart () 방법 은 receice () 방법 스 레 드 를 시작 하여 데이터 원본 서버 에서 보 낸 데 이 터 를 수신 하고 Spark 에 데 이 터 를 전송 합 니 다. 구체 적 인 방법 은 다음 과 같은 코드 를 실현 합 니 다.핵심:/** Create a socket connection and receive data until receiver is stopped */ private def receive() { val _pattern: String = "yyyy-MM-dd HH:mm:ss SSS" val format: SimpleDateFormat = new SimpleDateFormat(_pattern) val _isTimeOut: Boolean = isTimeOut val _sec :Int = sec var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) println(format.format(new Date())) println("
") if(_isTimeOut) socket.setSoTimeout(_sec * 1000) // Until stopped or connection broken continue reading val reader = new BufferedReader( new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) userInput = reader.readLine() //while(!isStopped && userInput != null) { while(!isStopped && userInput != null) { println(userInput) store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } }
그 중 핵심 은 그 아버 지 를 호출 하 는 것 이다.
클래스 Receiver 의 store (userInput) 는 데 이 터 를 Spark 로 보 내 는 것 을 의미 합 니 다. 그 중에서 userInput 은 사용자 정의 데이터 원본 서버 의 모든 줄 의 데 이 터 를 받 습 니 다.
나머지 코드 는 Socket 의 클 라 이언 트 프로 그래 밍 과 유사 합 니 다.
2. 아 날로 그 서버 사용자 정의 데이터 원본 실현
사용자 정의 수신 기 가 구현 되 었 지만 단순 한 수신 기 는 수신 데이터 만 클 라 이언 트 에 속 하기 때문에 데이터 원본 서버 가 있어 야 합 니 다. 그래 야 클 라 이언 트 가 해당 하 는 서버 에 연결 되 고 데이터 원본 서버 에서 보 낸 데 이 터 를 받 을 수 있 습 니 다.
여기 서 아 날로 그 데이터 소스 서버 에서 사용자 정의 데이터 소스 서버 를 개발 하고 콘 솔 입력 을 통 해 데 이 터 를 클 라 이언 트 로 전송 합 니 다. 관련 코드:/** * Socket , CustomReceiver */ class CustomServer(port: Int, isTimeOut: Boolean ,sec: Int) { val _pattern: String = "yyyy-MM-dd HH:mm:ss SSS" val format: SimpleDateFormat = new SimpleDateFormat(_pattern) val _isTimeOut = isTimeOut val _sec: Int = sec val _port = port def onStart(): Unit = { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { sServer() } }.start() } def onStop(): Unit = { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } def sServer(): Unit = { println("----------Server----------") println(format.format(new Date())) var tryingCreateServer = 1 try { val server = new ServerSocket(_port) println("
") if(_isTimeOut) server.setSoTimeout(_sec) val socket = server.accept() println(format.format(new Date())) println(" ") val writer = new OutputStreamWriter(socket.getOutputStream) println(format.format(new Date())) val in = new Scanner(System.in) //
, in.useDelimiter("
") println(" ") var flag = in.hasNext while (flag){ val s = in.next() /** * :writer s ,
*/ writer.write(s + "
") Thread.sleep(1000) if(socket.isClosed){ println("socket is closed !") }else{ try{ writer.flush() }catch { case e: java.net.SocketException => println("Error !!!!!!!!!") flag = false writer.close() socket.close() server.close() onStart() return } } } System.out.println(format.format(new Date())) System.out.println("
") /** * */ if(tryingCreateServer < 5){ writer.close() socket.close() server.close() onStart() tryingCreateServer += 1 } } catch{ case e: SocketTimeoutException => System.out.println(format.format(new Date()) + "
" + _sec + "
"); e.printStackTrace() case e: SocketException => e.printStackTrace() case e: Exception => e.printStackTrace() } } } object CustomServer { def main(args: Array[String]): Unit = { new CustomServer(8888, false, 0).onStart() } }
매개 변수: 매개 변수 와 클 라 이언 트 의 매개 변수 가 일치 합 니 다.
실현 원리: 아 날로 그 데이터 원본 서버 의 데이터 전송, 콘 솔 을 통 해 데 이 터 를 입력 하고 각 줄 의 데 이 터 를 구분 하여 클 라 이언 트 에 보 냅 니 다.다른 사용자 정의 데이터 원본 을 스스로 실현 하려 면 Scanner 의 데이터 획득 방식 을 바 꾸 면 됩 니 다.
서버 배치: 이 서버 는 sh 방식 으로 Liux 에서 main 방법 으로 onStart () 방법 으로 Socket 서 비 스 를 시작 할 수 있 습 니 다. main 방법 은 입구 방법 입 니 다.
3. Spark Streaming 호출 관련 수신 기
StreamingContext 에서 DStream 을 만 들 때 receiverStream 을 호출 하여 사용자 정의 수신 기 클래스 의 인 스 턴 스 를 전달 합 니 다.val receiverInputDStream = ssc.receiverStream(new CustomReceiver("hadoop01", 8888, false, 0))
그 중에서 ssc 는 Streaming Context 의 인 스 턴 스 입 니 다.
그 중에서 구독 수신 기 클래스 인 스 턴 스 의 첫 번 째 매개 변수 인 'hadop 01' 은 사용자 정의 데이터 소스 서버 의 ip 또는 호스트 이름 을 대표 하고 두 번 째 매개 변 수 는 8888 데이터 소스 서버 의 포트 번 호 를 대표 하 며 세 번 째 매개 변 수 는 false 는 시간 초과 설정 을 시작 하지 않 고 네 번 째 매개 변 수 는 0 세대 표 시간 초과 입 니 다.
비고: SparkStreaming 분석 프로그램 을 시작 하기 전에 데이터 원본 서버 를 시작 해 야 합 니 다. 서버 의 콘 솔 을 통 해 관련 데 이 터 를 입력 하여 전체 프로 세 스 테스트 를 할 수 있 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 프로그래밍 기본 사항(Python 버전)참조 웹사이트: Hadoop 환경이 있어야 합니다. 내 다른 블로그를 읽을 수 있습니다. 2.Spark 환경 변수 파일 수정 spark env SH 파일(vi ./conf/spark-env.sh)을 편집하고 첫 번째...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.