Spark Streaming 통합 Kafka 의 Offset 관리 [데이터 제로 잃 어 버 린 MySQL 관리 Offset]
22962 단어 spark
Spark Streaming 을 사용 하여 Kafka 0.8 버 전 을 통합 할 때 spark - streaming - kafka - 0 - 8 은 offset 관 리 를 제공 하지 않 습 니 다.데이터 의 제로 손실 을 확보 하기 위해 서 우 리 는 스스로 이 오프셋 을 관리 해 야 한다.
참조:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
저 희 는 MySQL 에 편 이 량 을 저장 하여 관리 합 니 다.
빠 른 입문 은 scalikejdbc 를 사용 하여 MySQL 을 조작 합 니 다:
1. 가 져 오기 의존
org.scalikejdbc
scalikejdbc-config_2.11
2.5.0
2. resource 파일 에 새 application. conf 파일 을 만 듭 니 다. 설정 은 다음 과 같 습 니 다.
# MySQL example
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8"
db.default.user="root"
db.default.password="123456"
3.Scalikejdbc Demo
package com.csylh.logAnalysis.scalikejdbc
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs
/**
* scala mysql, ScaLikeJdbc
*/
object ScaLikeJdbcApp {
def main(args: Array[String]): Unit = {
// application.conf
DBs.setup()
//DBs.setupAll()
DB.autoCommit {
implicit session =>
SQL("insert into people(name,age,fv) values(?,?,?)")
.bind(" ", 22, 88)
.update().apply()
}
}
def delete() = {
DB.autoCommit {
implicit session =>
SQL("delete from people where name = ?")
.bind(" ")
.update().apply()
}
}
def update() {
DB.autoCommit { implicit session =>
SQL("update people set age = ? where name = ?")
.bind(18, " ")
.update().apply()
}
}
/**
* select rs ,
*/
def select() = {
DB.readOnly {
implicit session =>
val sql = SQL("select * from people ").map(rs =>
(rs.string("name"), rs.int("age"))
).toList().apply()
}
}
}
제로 손실 데이터 솔 루 션 코드 는 다음 과 같 습 니 다.
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.config.DBs
/**
* Description: Spark Streaming Kafka
* MySQL/zk/kafka/hbase/redis ... offset
* MySQL + scalikejdbc
*
* @Author: 36
* @Date: 2019/8/8 11:24
*/
object OffsetApp {
def main(args:Array[String]){
val conf= new SparkConf().setAppName("OffsetApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map(
"metadata.broker.list"->"192.168.1.116:9092",
"group.id"->"liuge.group.id" ,//
"auto.offset.reset" -> "smallest"
)
val topics = "test".split(",").toSet
// : offset
import scalikejdbc._
/**
* tuple map
* ().list.apply().toMap
*/
DBs.setup()
val fromOffsets =
DB.readOnly {
implicit session => {
SQL("select * from offsets_storage ").map(rs =>
(TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
).list().apply()
}
}.toMap
for (ele <- fromOffsets){
println(" MySQL : " + ele._1.topic + ":" + ele._1.partition +":"+ele._2)
}
// : Direct Kafka , InputDStream
val stream = if (fromOffsets.isEmpty){
// , Kafka ==> stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
}else{ // , Kafka ==> stream
val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
}
// : + offset
stream.foreachRDD(rdd => {
// TODO.. 3.1 , count()
println(" :" + rdd.count())
// 3.2 offset
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(" :"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
DB.autoCommit{
implicit session => {
SQL("replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)")
.bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
}
}
}
})
ssc.start()
ssc.awaitTermination()
}
}
여기까지 만 하면 우 리 는 카 프 카 의 데 이 터 를 비교적 잘 소비 할 수 있 을 것 이다.
어떤 잘못된 점 이 있 으 면 지 증 을 환영 합 니 다!감사합니다 ~ ~
KafkaProducer [자바 버 전] 시 뮬 레이 션 데이터
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.