Spark Streaming 연산 자 개발 사례
transform 연산 자 개발
transform 작업 은 DStream 에 적 용 될 때 임의의 RDD 에서 RDD 로 전환 하 는 작업 을 수행 할 수 있 고 DStream API 에서 제공 되 지 않 은 작업 을 실현 할 수 있 습 니 다.예 를 들 어 DStream API 에서 하나의 DStream 중의 모든 batch 를 제공 하지 않 고 특정한 RDD 와 join 하 는 작업 을 수행 할 수 있 습 니 다.DStream 의 join 연산 자 는 join 다른 DStream 만 할 수 있 습 니 다.그러나 우 리 는 스스로 transform 조작 을 사용 하여 이 기능 을 실현 할 수 있다.
인 스 턴 스:블랙리스트 사용자 실시 간 필터
package StreamingDemo
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
*
*/
object TransformDemo {
def main(args: Array[String]): Unit = {
//
Logger.getLogger("org").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// RDD
val blackRDD =
ssc.sparkContext.parallelize(Array(("zs", true), ("lisi", true)))
// socket nc
val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
/**
*
* zs sb sb sb sb
* lisi fuck fuck fuck
* jack hello
*/
linesDStream
.map(x => {
val info = x.split(" ")
(info(0), info.toList.tail.mkString(" "))
})
.transform(rdd => { //transform RDD->RDD , RDD
/**
* leftouterjoin , :
* (zs,(sb sb sb sb),Some(true)))
* (lisi,(fuck fuck fuck),some(true)))
* (jack,(hello,None))
*/
val joinRDD = rdd.leftOuterJoin(blackRDD)
// Some(true) , , None , ,
val filterRDD = joinRDD.filter(x => x._2._2.isEmpty)
filterRDD
})
.map(x=>(x._1,x._2._1)).print()
ssc.start()
ssc.awaitTermination()
}
}
테스트nc 시작,사용자 및 발언 정보 전송
프로그램 이 블랙리스트 에 있 는 사용자 의 발언 을 실시 간 으로 걸 러 내 는 것 을 볼 수 있다.
updateStateByKey 연산 자 개발
updateStateByKey 연산 자 는 임의의 상 태 를 유지 할 수 있 으 며,동시에 새로운 정 보 를 계속 업데이트 할 수 있 습 니 다.이 연산 자 는 모든 key 에 state 를 유지 하고,끊임없이 state 를 업데이트 할 수 있 습 니 다.모든 batch 에 있어 Spark 는 이전에 존 재 했 던 key 에 State 업데이트 함 수 를 한 번 씩 적용 합 니 다.이 key 가 batch 에 새로운 값 이 있 든 없 든 State 업데이트 함수 가 되 돌아 오 는 값 이 none 이면 이 key 에 대응 하 는 state 는 삭 제 됩 니 다.새로 생 긴 키 에 대해 서도 state 업데이트 함 수 를 실행 합 니 다.
이 계산 자 를 사용 하려 면 반드시 두 가지 절 차 를 진행 해 야 한다.
인 스 턴 스:캐 시 기반 실시 간 WordCount
package StreamingDemo
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* WordCount,
*/
object UpdateStateByKeyDemo {
def main(args: Array[String]): Unit = {
//
Logger.getLogger("org").setLevel(Level.WARN)
/**
* Kerberos null, HADOOP_USER_NAME ,
* Hadoop hadoop username
* , ,
*/
//System.setProperty("HADOOP_USER_NAME","Setsuna")
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// Checkpoint
ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")
// DStream
val lineDStream = ssc.socketTextStream("Hadoop01", 6666)
val wordDStream = lineDStream.flatMap(_.split(" "))
val pairsDStream = wordDStream.map((_, 1))
/**
* state:
* values: batch key values
*/
val resultDStream =
pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
// state none, , 0 count
var count = state.getOrElse(0)
// values, value
for (value <- values) {
count += value
}
// key state,
Option(count)
})
//
resultDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
테스트nc 오픈,단어 입력
콘 솔 실시 간 출력 결과
window 슬라이딩 창 연산 자 개발
Spark Streaming 은 미끄럼 창 동작 을 지원 합 니 다.미끄럼 창 에 있 는 데 이 터 를 계산 할 수 있 습 니 다.
미끄럼 창 에는 일괄 처리 간격,창 간격,미끄럼 간격 이 포함 되 어 있 습 니 다.
창 작업 의 경우 창 내부 에 N 개의 일괄 처리 데이터 가 있 습 니 다4.567917.일괄 처리 데이터 의 크기 는 창 간격 에 의 해 결정 되 고 창 간격 은 창의 지속 시간,즉 창의 길 이 를 말 합 니 다4.567917.미끄럼 시간 간격 은 얼마나 걸 려 서 창 이 한 번 미 끄 러 지고 새로운 창 이 형성 되 는 지 를 말 합 니 다.미끄럼 간격 은 기본 적 인 상황 에서 일괄 처리 시간 간격 과 같 습 니 다메모:미끄럼 시간 간격 과 창 시간 간격의 크기 는 일괄 처리 간격의 정수 배로 설정 해 야 합 니 다
공식 적 인 그림 으로 설명 하 다.
일괄 처리 간격 은 1 시간 단위 이 고 창 간격 은 3 시간 단위 이 며 미끄럼 간격 은 2 시간 단위 입 니 다.초기 창 time1-time 3 에 대해 서 는 창 간격 이 만족 해야만 데 이 터 를 처리 할 수 있 습 니 다.따라서 미끄럼 창 작업 은 두 개의 인자,창 길이 와 미끄럼 시간 간격 을 지정 해 야 합 니 다.Spark Streaming 에서 미끄럼 창 에 대한 지원 은 Storm 보다 더 완벽 합 니 다.
Window 슬라이딩 연산 자 조작
계산 하 다
묘사 하 다.
window()
모든 슬라이딩 창의 데이터 에 대해 사용자 정의 계산 을 실행 합 니 다.
countByWindow()
모든 미끄럼 창 데이터 에 count 작업 을 수행 합 니 다.
reduceByWindow()
모든 슬라이딩 창의 데이터 에 대해 reduce 작업 을 실행 합 니 다.
reduceByKeyAndWindow()
모든 슬라이딩 창의 데이터 에 대해 reduceByKey 동작 을 실행 합 니 다.
countByValueAndWindow()
모든 슬라이딩 창의 데이터 에 대해 countByValue 작업 을 실행 합 니 다.
reduceByKeyAndWindow 연산 자 개발
인 스 턴 스:온라인 핫 이 슈 검색 어 실시 간 미끄럼 통계
2 초 간격 으로 최근 5 초 간 검색 어 중 가장 높 은 3 개 검색 어 와 출현 횟수 를 집계 한다.
package StreamingDemo
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* : 2 , 5 3
*/
object ReduceByKeyAndWindowDemo {
def main(args: Array[String]): Unit = {
//
Logger.getLogger("org").setLevel(Level.WARN)
//
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
// 1s
val ssc = new StreamingContext(conf, Seconds(1))
val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
linesDStream
.flatMap(_.split(" ")) //
.map((_, 1)) // (word,1)
.reduceByKeyAndWindow(
//
//x ,y Key
(x: Int, y: Int) => x + y,
// 5
Seconds(5),
// 2
Seconds(2)
)
.transform(rdd => { //transform rdd , rdd
// Key , , 3
val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)
// Array resultRDD
val resultRDD = ssc.sparkContext.parallelize(info)
resultRDD
})
.map(x => s"${x._1} :${x._2}")
.print()
ssc.start()
ssc.awaitTermination()
}
}
테스트 결과DStream 출력 조작 개관
Spark Streaming 은 DStream 의 데 이 터 를 외부 시스템 으로 출력 할 수 있 습 니 다.DSteram 의 모든 계산 은 output 작업 에 의 해 실 행 됩 니 다.foreachRDD 출력 작업 도 안에서 RDD 에 action 작업 을 수행 해 야 모든 batch 에 대한 계산 논 리 를 촉발 할 수 있 습 니 다.
바꾸다
묘사 하 다.
print()
Driver 에서 DStream 의 데이터 의 10 개 요 소 를 출력 합 니 다.주로 테스트 에 사용 되 거나 output 작업 이 필요 없 을 때 job 를 간단하게 터치 하 는 데 사 용 됩 니 다.
saveAsTextFiles(prefix,
[suffix])
DStream 의 내용 을 텍스트 파일 로 저장 합 니 다.그 중에서 일괄 처리 간격 에서 발생 하 는 파일 은 prefix-TIME 입 니 다.IN_MS[.suffix]방식 으로 명명 합 니 다.
saveAsObjectFiles(prefix
, [suffix])
DStream 의 내용 을 대상 에 따라 정렬 하고 SequenceFile 형식 으로 저장 합 니 다.그 중 매번 일괄 처리 간격 으로 생 성 되 는 파일 은 prefix-TIMEIN_MS[.suffix]방식 으로 명명 합 니 다.
saveAsHadoopFiles(pref
ix, [suffix])
DStream 의 내용 을 텍스트 로 Hadoop 파일 로 저장 합 니 다.일괄 처리 간격 에서 발생 하 는 파일 입 니 다.
prefix-TIME 로IN_MS[.suffix]방식 으로 명명 합 니 다.
foreachRDD(func)
가장 기본 적 인 출력 작업 은 func 함 수 를 DStream 의 RDD 에 적용 합 니 다.이 작업 은 외부 시스템 으로 데 이 터 를 출력 합 니 다.
예 를 들 어 RDD 를 파일 이나 네트워크 데이터베이스 에 저장 하 는 등 입 니 다.주의해 야 할 것 은 func 함수 가 이 streaming 을 실행 하고 있 습 니 다.
응용 드라이버 프로 세 스에 서 실 행 됩 니 다.
foreachRDD 연산 자 개발
foreachRDD 는 가장 자주 사용 하 는 output 작업 으로 DStream 에서 발생 하 는 모든 RDD 를 옮 겨 다 니 며 처리 한 다음 에 모든 RDD 의 데 이 터 를 외부 저장 소 에 기록 할 수 있 습 니 다.예 를 들 어 파일,데이터 베이스,캐 시 등 은 보통 RDD 에 대해 action 작업 을 수행 합 니 다.예 를 들 어 foreach.
foreachRDD 작업 데이터베이스 사용
일반적으로 foreachRDD 에 서 는 JDBC Connection 과 같은 Connection 을 만 든 다음 Connection 을 통 해 데 이 터 를 외부 저장 소 에 기록 합 니 다.
오류 1:RDD 의 foreach 작업 외부 에 Connection 만 들 기
dstream.foreachRDD { rdd =>
val connection=createNewConnection()
rdd.foreach { record => connection.send(record)
}
}
이러한 방식 은 잘못된 것 입 니 다.이러한 방식 은 Connection 대상 이 직렬 화 된 후에 모든 task 에 전 송 될 수 있 습 니 다.그러나 Connection 대상 은 직렬 화 를 지원 하지 않 기 때문에 전송 할 수 없습니다.오류 2:RDD 의 foreach 작업 내부 에 Connection 만 들 기
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
이 방식 은 가능 하지만 실행 효율 이 낮 습 니 다.RDD 의 모든 데이터 에 대해 Connection 대상 을 만 들 수 있 기 때문에 보통 Connection 대상 의 생 성 은 성능 을 소모 합 니 다.합 리 적 인 방식
MySQL 데이터베이스 작성 표 문 구 는 다음 과 같 습 니 다.
CREATE TABLE wordcount (
word varchar(100) CHARACTER SET utf8 NOT NULL,
count int(10) NOT NULL,
PRIMARY KEY (word)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
IDEA 에 mysql-connector-java-5.1.40-bin.jar 추가코드 는 다음 과 같다.
연못 을 연결 하 는 코드 는 처음에 정적 블록 으로 연못 을 써 서 직접 얻 으 려 고 했 지만 연못 의 너비 가 부족 한 문 제 를 고려 하면 이런 방식 이 더 좋 았 습 니 다.처음에 연결 풀 을 예화 하고 호출 되 어 연결 을 얻 었 습 니 다.연결 이 모두 가 져 왔 을 때 연못 이 비 었 으 면 연못 을 다시 예화 해서 나 왔 습 니 다.
package StreamingDemo
import java.sql.{Connection, DriverManager, SQLException}
import java.util
object JDBCManager {
var connectionQue: java.util.LinkedList[Connection] = null
/**
*
* @return
*/
def getConnection(): Connection = {
synchronized({
try {
// , Connection
if (connectionQue == null) {
connectionQue = new util.LinkedList[Connection]()
for (i <- 0 until (10)) {
// 10 ,
val connection = DriverManager.getConnection(
"jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8",
"root",
"root")
// push
connectionQue.push(connection)
}
}
} catch {
//
case e: SQLException => e.printStackTrace()
}
// , ,
return connectionQue.poll()
})
}
/**
* ,
* @param connection
*/
def returnConnection(connection: Connection) = {
//
connectionQue.push(connection)
}
def main(args: Array[String]): Unit = {
//main
getConnection()
println(connectionQue.size())
}
}
wordcount 코드
package StreamingDemo
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, streaming}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ForeachRDDDemo {
def main(args: Array[String]): Unit = {
// , INFO
Logger.getLogger("org").setLevel(Level.WARN)
// Hadoop ,
System.setProperty("HADOOP_USER_NAME", "Setsuna")
//Spark
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, streaming.Seconds(2))
// updateStateByKey, checkpoint
ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")
// socket, nc
val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
val wordCountDStream = linesDStream
.flatMap(_.split(" ")) //
.map((_, 1)) // (word,1)
.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
//
var count = state.getOrElse(0)
for (value <- values) {
count += value
}
Option(count)
})
wordCountDStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreachPartition(part => {
//
val connection = JDBCManager.getConnection()
part.foreach(data => {
val sql = // wordcount wordcount ,on duplicate key update
s"insert into wordcount (word,count) " +
s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}"
// prepareStatement sql
val pstmt = connection.prepareStatement(sql)
pstmt.executeUpdate()
})
// ,
JDBCManager.returnConnection(connection)
})
}
})
ssc.start()
ssc.awaitTermination()
}
}
nc 를 열 고 데 이 터 를 입력 하 십시오.다른 터미널 에서 wordcount 의 결 과 를 조회 하면 실시 간 으로 변화 가 발생 한 것 을 발견 할 수 있다
이상 이 본문의 전부 입 니 다.여러분 의 학습 에 도움 이 되 기 를 바 랍 니 다.그리고 여러분 의 많은 성원 을 바 랍 니 다.\#niming{position:fixed; bottom:0; z-index:9999;right:0;width:100%;background:#e5e5e5;}.fengyu{width:99%;padding:0.5%;text-align:center;}a{text-decoration: none;color:#000;}.tuiguangbiaoji{font-size:10px;margin:0px 10px;}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.