Spark 학습 노트 Spark Streaming 의 사용

8092 단어 SparkStreaming
1. Spark Streaming
Spark Streaming 은 Spark Core 를 바탕 으로 하 는 실시 간 계산 프레임 워 크 로 많은 데이터 소스 에서 데 이 터 를 소비 하고 데 이 터 를 처리 할 수 있다4.567917.Spark Stream 에서 가장 기본 적 인 추상 을 DStream(대리)이 라 고 하 는데 본질 적 으로 일련의 연속 적 인 RDD 이다.DStream 은 사실은 RDD 에 대한 포장 이다
  • DStream 은 RDD 의 공장 이 라 고 볼 수 있다.이 DStream 에서 생산 하 는 것 은 모두 같은 업무 논리 의 RDD 이 고 RDD 에서 데 이 터 를 읽 어야 하 는 것 과 다르다
  • 일괄 처리 시간 간격 에서 DStream 은 하나의 RDD 만 생 성 합 니 다
  • DStream 은 하나의'템 플 릿'에 해당 합 니 다.우 리 는 이'템 플 릿'에 따라 일정 시간 간격 으로 발생 하 는 이 rdd 를 처리 할 수 있 습 니 다.이 를 근거 로 rdd 의 DAG 를 구축 할 수 있 습 니 다.
    2.현재 유행 하 는 실시 간 계산 엔진
    물동량 프로 그래 밍 언어 처리 속도 생태
    Storm 이 낮 아 요.clojure 가 빨 라 요.(아 초)아 리(JStorm)
    Flink 높 음 scala 빠 름(아 초)국내 사용 이 적 음
    Spark Streaming 굉장히 높 은 scala 빠 른(밀리초)완벽 한 생태 권
    3.Spark 스 트 리밍 처리 네트워크 데이터
    
    //  StreamingContext                               
    val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Milliseconds(3000))
    val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)
    val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
    pairRetDS.print()
    //     
    ssc.start()
    //     
    ssc.awaitTermination()
    
    4.Spark Streaming 이 데 이 터 를 수신 하 는 두 가지 방식(Kafka)
    Receiver
    4.567917.오프셋 은 zookeeper 가 유지 하 는 것 이다
  • Kafka 고급 API(소비자 의 API)를 사 용 했 습 니 다
  • 프로 그래 밍 이 간단 하 다.
    효율 이 낮다(데이터 의 안전성 을 확보 하기 위해 WAL 이 열 린 다)4.567917.kafka 0.10 버 전에 서 Receiver 를 완전히 버 렸 습 니 다.
    생산 환경 은 일반적으로 이런 방식 을 사용 하지 않 는 다.
    Direct
    4.567917.오프셋 은 우리 가 수 동 으로 유지 하 는 것 이다효율 이 높다.
    프로 그래 밍 이 비교적 복잡 하 다.
    생산 환경 은 일반적으로 이런 방식 을 사용한다.
    5.스파크 스 트 리밍 통합 카 프 카
    Receiver 기반 으로 kafka 통합(생산 환경 은 사용 을 권장 하지 않 으 며 0.10 에서 제거 되 었 습 니 다)
    
    //  StreamingContext                               
    val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Milliseconds(3000))
    val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
    val groupId = "myid"
    val topics = Map("hadoop" -> 3)
    val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
    receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
    
    Direct 기반 방식(생산 환경 사용)
    
    //  StreamingContext                               
    val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Milliseconds(3000))
    val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
    val topics = Set("hadoop")
    val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()
    
    6.실시 간 흐름 계산의 구조

    1.로그 생 성(아 날로 그 사용자 가 웹 에 접근 하 는 로그)
    
    public class GenerateAccessLog {
      public static void main(String[] args) throws IOException, InterruptedException {
        //    
        int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
        String[] requesTypes = {"GET", "POST"};
        String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};
    
        String[] courseNames = {"   ", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
        String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
        FileWriter fw = new FileWriter(args[0]);
        PrintWriter printWriter = new PrintWriter(fw);
        while (true) {
          //      Thread.sleep(1000);
          //    
          String date = new Date().toLocaleString();
          String method = requesTypes[getRandomNum(0, requesTypes.length)];
          String url = "/cursor" + cursors[getRandomNum(0, cursors.length)];
          String HTTPVERSION = "HTTP/1.1";
          String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)];
          String reference = references[getRandomNum(0, references.length)];
          String rowLog = date + " " + method + " " + url + " " + HTTPVERSION + " " + ip + " " + reference;
          printWriter.println(rowLog);
          printWriter.flush();
        }
      }
    
    
      //[start,end)
      public static int getRandomNum(int start, int end) {
        int i = new Random().nextInt(end - start) + start;
        return i;
      }
    }
    
    
    2.flume avro 를 사용 하여 웹 응용 서버 의 로그 데 이 터 를 수집 합 니 다.
    채집 명령 이 실 행 된 결 과 는 avro 에 있 습 니 다.
    
    # The configuration file needs to define the sources, 
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent, 
    # in this case called 'agent'
    f1.sources = r1
    f1.channels = c1
    f1.sinks = k1
    
    #define sources
    f1.sources.r1.type = exec
    f1.sources.r1.command =tail -F /logs/access.log
    
    #define channels
    f1.channels.c1.type = memory
    f1.channels.c1.capacity = 1000
    f1.channels.c1.transactionCapacity = 100
    
    #define sink      uplooking03
    f1.sinks.k1.type = avro
    f1.sinks.k1.hostname = uplooking03
    f1.sinks.k1.port = 44444
    
    #bind sources and sink to channel 
    f1.sources.r1.channels = c1
    f1.sinks.k1.channel = c1
     avro      
    # The configuration file needs to define the sources, 
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent, 
    # in this case called 'agent'
    f2.sources = r2
    f2.channels = c2
    f2.sinks = k2
    
    #define sources
    f2.sources.r2.type = avro
    f2.sources.r2.bind = uplooking03
    f2.sources.r2.port = 44444
    
    #define channels
    f2.channels.c2.type = memory
    f2.channels.c2.capacity = 1000
    f2.channels.c2.transactionCapacity = 100
    
    #define sink
    f2.sinks.k2.type = logger
    
    #bind sources and sink to channel 
    f2.sources.r2.channels = c2
    f2.sinks.k2.channel = c2
     avro   kafka 
    # The configuration file needs to define the sources, 
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent, 
    # in this case called 'agent'
    f2.sources = r2
    f2.channels = c2
    f2.sinks = k2
    
    #define sources
    f2.sources.r2.type = avro
    f2.sources.r2.bind = uplooking03
    f2.sources.r2.port = 44444
    
    #define channels
    f2.channels.c2.type = memory
    f2.channels.c2.capacity = 1000
    f2.channels.c2.transactionCapacity = 100
    
    #define sink
    f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    f2.sinks.k2.topic = hadoop
    f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
    f2.sinks.k2.requiredAcks = 1
    
    
    이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

    좋은 웹페이지 즐겨찾기