Spark Streaming 사용자 정의 데이터 원본 - 사용자 정의 입력 DStream 및 수신 기 구현

Spark Streaming 사용자 정의 데이터 원본 - 사용자 정의 입력 DStream 및 수신 기 구현
참고 문서: SparkStreaming 프로 그래 밍 가이드 (공식 문서)http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.html
본 고 는 코드 언어 Scala 를 실현 한다.
전체 절 차 는 다음 과 같은 몇 단계 로 나 뉜 다.
1. 사용자 정의 수신 기 구현 (receiver)
사용자 정의 수신 기 (receiver) 를 실현 하려 면 다음 과 같은 몇 가 지 를 주의해 야 한다.
  1.1  Socket 소켓 에 해당 하 는 클 라 이언 트
       사용자 정의 수신 기 (receiver) 는 사실 Socket 소켓 소켓 에 해당 하 는 클 라 이언 트 프로 그래 밍 으로 서버 의 특정 IP 와 포트 에서 보 낸 데 이 터 를 수신 하 는 데 사 용 됩 니 다. 이곳 의 IP 와 포트 는 특정한 Socket 서버 의 IP 와 포트 변화 에 따라 변화 해 야 합 니 다.
 1.2 Receiver 클래스 를 계승 하여 onStart () 와 onStop () 방법 을 복사 합 니 다.
     구체 적 인 패키지: org. apache. spark. streaming. receiver. Receiver
 
  
class CustomReceiver(host: String, port: Int,  isTimeOut: Boolean, sec: Int)
         extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging{

    매개 변수 형식: host: String 과 port: Int 를 포함해 야 합 니 다. ,그 중에서 host 는 데이터 원본 서버 의 IP, port 는 데이터 원본 서버 의 포트 번호 이 고 나머지 매개 변 수 는 선택 할 수 있 습 니 다. 그 중에서 isTimeOut 은 Boolean 형식 으로 시간 초과 설정 여 부 를 표시 합 니 다. true 는 시간 초과 설정, false 대 표 는 설정 하지 않 습 니 다. set 는 Int 형식 으로 시간 초과 형식 은 s 초 급 입 니 다.
   복사 방법: 복사 가 필요 한 방법 은 두 가지 onStart () 와 onStop () 방법 이 있 습 니 다.
override def onStart(): Unit = {
  // Start the thread that receives data over a connection
  new Thread("Socket Receiver") {
    override def run() { receive() }
  }.start()
}

override def onStop(){
  // There is nothing much to do as the thread calling receive()
  // is designed to stop by itself if isStopped() returns false
}
코드 에서 onStart () 는 연결 을 통 해 데 이 터 를 받 는 방법 을 가 진 스 레 드 를 시작 합 니 다. 그 중에서 receiver () 방법 으로 데이터 원본 서버 에 연결 하여 서버 데 이 터 를 얻 고 Spark 에 푸 시 합 니 다.
onStop 은 코드 블록 을 실현 할 필요 가 없습니다.
1.3 방법 receive 수신 데이터 및 Spark 전송
 onStart () 방법 은 receice () 방법 스 레 드 를 시작 하여 데이터 원본 서버 에서 보 낸 데 이 터 를 수신 하고 Spark 에 데 이 터 를 전송 합 니 다. 구체 적 인 방법 은 다음 과 같은 코드 를 실현 합 니 다.
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
  val _pattern: String = "yyyy-MM-dd HH:mm:ss SSS"
  val format: SimpleDateFormat = new SimpleDateFormat(_pattern)
  val _isTimeOut: Boolean = isTimeOut
  val _sec :Int = sec

  var socket: Socket = null
  var userInput: String = null
  try {
    // Connect to host:port
    socket = new Socket(host, port)
    println(format.format(new Date()))
    println("     
") if(_isTimeOut) socket.setSoTimeout(_sec * 1000) // Until stopped or connection broken continue reading val reader = new BufferedReader( new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) userInput = reader.readLine() //while(!isStopped && userInput != null) { while(!isStopped && userInput != null) { println(userInput) store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } }
핵심:
그 중 핵심 은 그 아버 지 를 호출 하 는 것 이다.
클래스 Receiver 의 store (userInput) 는 데 이 터 를 Spark 로 보 내 는 것 을 의미 합 니 다. 그 중에서 userInput 은 사용자 정의 데이터 원본 서버 의 모든 줄 의 데 이 터 를 받 습 니 다.
나머지 코드 는 Socket 의 클 라 이언 트 프로 그래 밍 과 유사 합 니 다.
2. 아 날로 그 서버 사용자 정의 데이터 원본 실현
      사용자 정의 수신 기 가 구현 되 었 지만 단순 한 수신 기 는 수신 데이터 만 클 라 이언 트 에 속 하기 때문에 데이터 원본 서버 가 있어 야 합 니 다. 그래 야 클 라 이언 트 가 해당 하 는 서버 에 연결 되 고 데이터 원본 서버 에서 보 낸 데 이 터 를 받 을 수 있 습 니 다.
     
여기 서 아 날로 그 데이터 소스 서버 에서 사용자 정의 데이터 소스 서버 를 개발 하고 콘 솔 입력 을 통 해 데 이 터 를 클 라 이언 트 로 전송 합 니 다. 관련 코드:
/**
  *    Socket   ,     CustomReceiver
  */
class CustomServer(port: Int, isTimeOut: Boolean ,sec: Int) {
  val _pattern: String = "yyyy-MM-dd HH:mm:ss SSS"
  val format: SimpleDateFormat = new SimpleDateFormat(_pattern)
  val _isTimeOut = isTimeOut
  val _sec: Int = sec
  val _port = port

  def onStart(): Unit = {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() {
        sServer()
      }
    }.start()
  }

  def onStop(): Unit = {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  def sServer(): Unit = {
    println("----------Server----------")
    println(format.format(new Date()))
    var tryingCreateServer = 1
    try {
      val server = new ServerSocket(_port)
      println("         
") if(_isTimeOut) server.setSoTimeout(_sec) val socket = server.accept() println(format.format(new Date())) println(" ") val writer = new OutputStreamWriter(socket.getOutputStream) println(format.format(new Date())) val in = new Scanner(System.in) //
in.useDelimiter("
") println(" ") var flag = in.hasNext while (flag){ val s = in.next() /** * :writer s ,
*/ writer.write(s + "
") Thread.sleep(1000) if(socket.isClosed){ println("socket is closed !") }else{ try{ writer.flush() }catch { case e: java.net.SocketException => println("Error !!!!!!!!!") flag = false writer.close() socket.close() server.close() onStart() return } } } System.out.println(format.format(new Date())) System.out.println("




") /** * */ if(tryingCreateServer < 5){ writer.close() socket.close() server.close() onStart() tryingCreateServer += 1 } } catch{ case e: SocketTimeoutException => System.out.println(format.format(new Date()) + "
" + _sec + "




"); e.printStackTrace() case e: SocketException => e.printStackTrace() case e: Exception => e.printStackTrace() } } } object CustomServer { def main(args: Array[String]): Unit = { new CustomServer(8888, false, 0).onStart() } }

매개 변수: 매개 변수 와 클 라 이언 트 의 매개 변수 가 일치 합 니 다.
실현 원리: 아 날로 그 데이터 원본 서버 의 데이터 전송, 콘 솔 을 통 해 데 이 터 를 입력 하고 각 줄 의 데 이 터 를 구분 하여 클 라 이언 트 에 보 냅 니 다.다른 사용자 정의 데이터 원본 을 스스로 실현 하려 면 Scanner 의 데이터 획득 방식 을 바 꾸 면 됩 니 다.
서버 배치: 이 서버 는 sh 방식 으로 Liux 에서 main 방법 으로 onStart () 방법 으로 Socket 서 비 스 를 시작 할 수 있 습 니 다. main 방법 은 입구 방법 입 니 다.
3. Spark Streaming 호출 관련 수신 기
         
StreamingContext 에서 DStream 을 만 들 때 receiverStream 을 호출 하여 사용자 정의 수신 기 클래스 의 인 스 턴 스 를 전달 합 니 다.
val receiverInputDStream = ssc.receiverStream(new CustomReceiver("hadoop01", 8888, false, 0))

그 중에서 ssc 는 Streaming Context 의 인 스 턴 스 입 니 다.
       그 중에서 구독 수신 기 클래스 인 스 턴 스 의 첫 번 째 매개 변수 인 'hadop 01' 은 사용자 정의 데이터 소스 서버 의 ip 또는 호스트 이름 을 대표 하고 두 번 째 매개 변 수 는 8888 데이터 소스 서버 의 포트 번 호 를 대표 하 며 세 번 째 매개 변 수 는 false 는 시간 초과 설정 을 시작 하지 않 고 네 번 째 매개 변 수 는 0 세대 표 시간 초과 입 니 다.
  
비고: SparkStreaming 분석 프로그램 을 시작 하기 전에 데이터 원본 서버 를 시작 해 야 합 니 다. 서버 의 콘 솔 을 통 해 관련 데 이 터 를 입력 하여 전체 프로 세 스 테스트 를 할 수 있 습 니 다.

좋은 웹페이지 즐겨찾기