spark 프로젝트 중 common pool 의 간단 한 사용
7452 단어 scala
배경:spark streaming 에서 계 산 된 결 과 는 es 에 떨 어 뜨 려 야 합 니 다.하나의 batch 의 데 이 터 는 유형 에 따라 서로 다른 index 에 저 장 됩 니 다.
현재 프레임 워 크:
elastic-hadop 이 프로젝트 는 단일 index 를 처리 하 는 장면 에서 쉽게 시작 할 수 있 습 니 다.그러나 이 프로젝트 는 두 가지 부족 합 니 다.첫 번 째 는 데이터 가 다시 group 한 후에 서로 다른 index 에 넣 어야 합 니 다.다른 문 제 는 이 프로젝트 가 9200 이라는 http 포트 를 사용 하 는 것 입 니 다.
개조 하 다.
spark 의 공식 문서 에서 좋 은 방법 을 추 천 했 습 니 다.바로 자원 탱크 를 이용 하 는 것 입 니 다.사실 자원 탱크 의 가장 큰 장점 은 대상 이 풀 화 되 고 불필요 한 gc 를 피 하 는 것 입 니 다.그러면 우 리 는 우리 의 코드 도 개조 할 수 있 습 니 다.우 리 는 common-pol 2 로 우리 의 sink 의 풀 화 를 실현 할 수 있 습 니 다.우리 sink 을 사용 할 때 풀 에서 가 져 오고 사용 하지 않 을 때 돌려 주면 됩 니 다.우리 sink 은 사실 dao 층 과 유사 하기 때문에 EsClient 에 대한 wrapper 입 니 다.가장 좋 은 방법 은 자바 의 Close 인 터 페 이 스 를 실현 한 다음 대상 을 소각 할 때 우리 의 esClient close 입 니 다.
관련 코드
sink 인터페이스
trait TimeSeriesSink extends Closeable with Serializable {
def write(ts: StructedTimeSeries)
}
sink 실현
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import com.ximalaya.spoor.stream.bean.{GaugeTimeSeries, StructedTimeSeries}
import com.ximalaya.spoor.stream.builder.IndexBuilder
import com.ximalaya.spoor.stream.sink.TimeSeriesSink
import org.elasticsearch.common.settings.Settings
import com.ximalaya.spoor.common.bean.MetricType._
/**
* @author todd.chen at 26/03/2017 08:01.
* email : todd.chen@ximalaya.com
*/
class TimeSeriesSinkImpl(node: String, port: Int, clusterName: String, indexPrefix: String) extends TimeSeriesSink {
private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
logger.info("tsSinker init")
private val settings = Settings.builder().put("cluster.name", clusterName).build()
private val client = ElasticClient.transport(settings, ElasticsearchClientUri(node, port))
private val indexBuilder = new IndexBuilder(indexPrefix)
override def write(ts: StructedTimeSeries): Unit = {
ts match {
case gauge: GaugeTimeSeries ⇒
client.execute {
index into indexBuilder.getIndex(gauge.interval, Gauge) source gauge.toJson
}
case _ ⇒
//TODO other type sink
}
}
override def close(): Unit = {
client.close()
}
}
풀 코드:
import com.ximalaya.spoor.stream.bean.EsPoolConfig
import com.ximalaya.spoor.stream.sink.TimeSeriesSink
import com.ximalaya.spoor.stream.sink.impl.TimeSeriesSinkImpl
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
/**
* @author todd.chen at 26/03/2017 15:08.
* email : todd.chen@ximalaya.com
*/
class PooledSinkFactory extends BasePooledObjectFactory[TimeSeriesSink] with Serializable {
private val esConf = EsPoolConfig.getEsConf
private val clusterName = esConf.clusterName
private val indexPrefix = esConf.indexPrefix
private val node = esConf.node
private val port = esConf.port
override def create(): TimeSeriesSink = {
new TimeSeriesSinkImpl(node, port, clusterName, indexPrefix)
}
override def wrap(t: TimeSeriesSink): PooledObject[TimeSeriesSink] = {
new DefaultPooledObject[TimeSeriesSink](t)
}
override def destroyObject(p: PooledObject[TimeSeriesSink]): Unit = {
p.getObject.close()
super.destroyObject(p)
}
}
object SinkPool extends GenericObjectPool[TimeSeriesSink](new PooledSinkFactory) with Serializable {
private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
private def initPool(): Unit = {
val conf = EsPoolConfig.getEsConf
logger.debug("es pool conf:{}", conf)
this.setMaxIdle(conf.maxIdle)
this.setMaxTotal(conf.maxConn)
logger.info("init es Pool")
}
initPool()
}
코드 사용
dStream.mapWithState(mapWithStateFunc).foreachRDD { rdd ⇒
rdd.foreachPartition { structedTs ⇒
val sink = SinkPool.borrowObject()
structedTs.flatten.foreach(sink.write)
SinkPool.returnObject(sink)
}
}
my github
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
JDK 11을 사용하여 NixOS에서 Play Framework 실행저는 NixOS로 전환하고 있으며 이에 대해 다소 기대하고 있습니다. 오늘 저는 sbt 설치 및 JDK 11로 다운그레이드를 포함하여 Play Framework 환경을 손쉽게 설치하고 실행할 수 있게 된 것을 축하합...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.