spark 프로젝트 중 common pool 의 간단 한 사용

7452 단어 scala
spark 프로젝트 중 common pool 의 간단 한 사용
배경:spark streaming 에서 계 산 된 결 과 는 es 에 떨 어 뜨 려 야 합 니 다.하나의 batch 의 데 이 터 는 유형 에 따라 서로 다른 index 에 저 장 됩 니 다.
현재 프레임 워 크:
elastic-hadop 이 프로젝트 는 단일 index 를 처리 하 는 장면 에서 쉽게 시작 할 수 있 습 니 다.그러나 이 프로젝트 는 두 가지 부족 합 니 다.첫 번 째 는 데이터 가 다시 group 한 후에 서로 다른 index 에 넣 어야 합 니 다.다른 문 제 는 이 프로젝트 가 9200 이라는 http 포트 를 사용 하 는 것 입 니 다.
개조 하 다.
spark 의 공식 문서 에서 좋 은 방법 을 추 천 했 습 니 다.바로 자원 탱크 를 이용 하 는 것 입 니 다.사실 자원 탱크 의 가장 큰 장점 은 대상 이 풀 화 되 고 불필요 한 gc 를 피 하 는 것 입 니 다.그러면 우 리 는 우리 의 코드 도 개조 할 수 있 습 니 다.우 리 는 common-pol 2 로 우리 의 sink 의 풀 화 를 실현 할 수 있 습 니 다.우리 sink 을 사용 할 때 풀 에서 가 져 오고 사용 하지 않 을 때 돌려 주면 됩 니 다.우리 sink 은 사실 dao 층 과 유사 하기 때문에 EsClient 에 대한 wrapper 입 니 다.가장 좋 은 방법 은 자바 의 Close 인 터 페 이 스 를 실현 한 다음 대상 을 소각 할 때 우리 의 esClient close 입 니 다.
관련 코드
sink 인터페이스
trait TimeSeriesSink extends Closeable with Serializable {
  def write(ts: StructedTimeSeries)
}

sink 실현
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import com.ximalaya.spoor.stream.bean.{GaugeTimeSeries, StructedTimeSeries}
import com.ximalaya.spoor.stream.builder.IndexBuilder
import com.ximalaya.spoor.stream.sink.TimeSeriesSink
import org.elasticsearch.common.settings.Settings
import com.ximalaya.spoor.common.bean.MetricType._
/**
  * @author todd.chen at 26/03/2017 08:01.
  *         email : todd.chen@ximalaya.com
  */
class TimeSeriesSinkImpl(node: String, port: Int, clusterName: String, indexPrefix: String) extends TimeSeriesSink {
  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
  logger.info("tsSinker init")
  private val settings = Settings.builder().put("cluster.name", clusterName).build()
  private val client = ElasticClient.transport(settings, ElasticsearchClientUri(node, port))
  private val indexBuilder = new IndexBuilder(indexPrefix)
  override def write(ts: StructedTimeSeries): Unit = {
    ts match {
      case gauge: GaugeTimeSeries ⇒
        client.execute {
          index into indexBuilder.getIndex(gauge.interval, Gauge) source gauge.toJson
        }
      case _ ⇒ 
        //TODO other type sink
    }
  }
  override def close(): Unit = {
    client.close()
  }
}

풀 코드:
import com.ximalaya.spoor.stream.bean.EsPoolConfig
import com.ximalaya.spoor.stream.sink.TimeSeriesSink
import com.ximalaya.spoor.stream.sink.impl.TimeSeriesSinkImpl
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}

/**
  * @author todd.chen at 26/03/2017 15:08.
  *         email : todd.chen@ximalaya.com
  */
class PooledSinkFactory extends BasePooledObjectFactory[TimeSeriesSink] with Serializable {


  private val esConf = EsPoolConfig.getEsConf
  private val clusterName = esConf.clusterName
  private val indexPrefix = esConf.indexPrefix
  private val node = esConf.node
  private val port = esConf.port

  override def create(): TimeSeriesSink = {
    new TimeSeriesSinkImpl(node, port, clusterName, indexPrefix)
  }

  override def wrap(t: TimeSeriesSink): PooledObject[TimeSeriesSink] = {
    new DefaultPooledObject[TimeSeriesSink](t)
  }

  override def destroyObject(p: PooledObject[TimeSeriesSink]): Unit = {
    p.getObject.close()
    super.destroyObject(p)
  }
}

object SinkPool extends GenericObjectPool[TimeSeriesSink](new PooledSinkFactory) with Serializable {
  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)

  private def initPool(): Unit = {
    val conf = EsPoolConfig.getEsConf
    logger.debug("es pool conf:{}", conf)
    this.setMaxIdle(conf.maxIdle)
    this.setMaxTotal(conf.maxConn)
    logger.info("init es Pool")
  }

  initPool()
}

코드 사용

    dStream.mapWithState(mapWithStateFunc).foreachRDD { rdd ⇒
      rdd.foreachPartition { structedTs ⇒
        val sink = SinkPool.borrowObject()
        structedTs.flatten.foreach(sink.write)
        SinkPool.returnObject(sink)
      }
    }

my github

좋은 웹페이지 즐겨찾기