scala 언어 로 병렬 정렬 (top k)

프로젝트 는 대량의 데 이 터 를 정렬 하여 top k 를 계산 해 야 하기 때문에 병렬 컴 퓨 팅 프레임 워 크 를 이해 하기 시 작 했 고 spark 를 접 했 습 니 다. spark 는 모두 scala 로 썼 기 때문에 spark 를 이해 하기 위해 scala 언어 를 악 보 했 습 니 다.이것 은 매우 세련 된 함수 식 언어 입 니 다. 가장 흥미 로 운 것 은 바로 천연 지원 병행 계산 입 니 다. 그리고 생 성 된 목표 코드 는 자바 가상 클 라 스 이기 때문에 자바 와 천연 친화력 을 가지 고 있 습 니 다.자바 코드 와 자 유 롭 게 호출 할 수 있 습 니 다.spark 구 조 를 통 해 빅 데이터 의 빠 른 정렬 (top k 실현) 을 실현 하려 고 했 으 나 spark 를 자세히 연구 한 결과 어려움 이 있 음 을 발견 하고 이 방안 을 잠시 포기 했다.그러나 새로운 해결 방법 은 scala (spark 의 부산물 연구) 의 병행 특성 을 이용 하여 빅 데이터 의 빠 른 정렬 모듈 을 실현 하고 시스템 에 가입 하여 자바 코드 호출 을 제공 하 는 것 이다.아래 의 코드 는 바로 이 모듈 의 핵심 정렬 알고리즘 이다.전체적인 절 차 는:
topmutable_par 방법 에서 정렬 할 데 이 터 를 세그먼트 로 나 눈 다음 scala 의 병렬 특성 을 이용 하여 병렬 방식 으로 sort 를 호출 합 니 다.range 는 각 단락 의 데 이 터 를 세그먼트 로 정렬 한 후 모든 세그먼트 정렬 결 과 를 reduce 합 니 다.
import scala.collection.mutable
import scala.collection.JavaConversions
/** *           * @author guyadong * @param <A> * @param <B> * @param <S> */
class HeapSort[A,B,S<:Iterable[A]](f:A=>B)(implicit ord: Ordering[B]){
  /** *  l        Seq * @param l           * @param desc  /  (   true,  ) * @return */
def sort(l: S,desc:Boolean=true)=HeapSort.sort(f)(l,0,desc)
  /** *  l      top    * @param l           * @param top          * @param desc  /  (   true,  ) * @return */
def top(l: S,top:Int,desc:Boolean=true)=HeapSort.sort(f)(l,top,desc)
  /** *        ,      Seq * @param l             * @param desc  /  (   true,  ) * @return */
def sort_m[M<:mutable.Seq[A]](l: M,desc:Boolean=true)=HeapSort.sort_mutable(f)(l,0,desc)
  /** *      l      top    * @param l             * @param top          * @param desc  /  (   true,  ) * @return */
def top_m[M<:mutable.Seq[A]](l: M,top:Int,desc:Boolean=true)=HeapSort.sort_mutable(f)(l,top,desc)
  /** *      l        top    * @param l             * @param top          * @param desc  /  (   true,  ) * @return */
def top_m_par[M<:mutable.Seq[A]](l: M,top:Int,desc:Boolean=true)=HeapSort.top_mutable_par(f)(l,top,desc)
  /** *      l              Seq * @param seq         * @param top          * @param desc  /  (   true,  ) * @param from          * @param until          * @return */
def sort_range[M<:mutable.Seq[A]](seq: M,top:Int,desc:Boolean=true)(from:Int=0, until:Int=seq.length)=HeapSort.sort_mutableRange(f)(seq,top,desc)(from, until)
  /** *  seq                , src   dst * @param seq      * @param src        (    ,    ) * @param dst         (    ,    ) * @param desc  /  (   true,  ) * @return */
def merge2Seq(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2Seq(f)(seq, src, dst, desc)
  /** *  seq                , src   dst * @param seq      * @param src        (    ,    ) * @param dst         (    ,    ) * @param desc  /  (   true,  ) * @return */
def merge2Seq2(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2Seq2(f)(seq, src, dst, desc)
  /** *  seq                , src   dst<br> *                * @param seq      * @param src        (    ,    ) * @param dst         (    ,    ) * @param desc  /  (   true,  ) * @return */
def merge2SeqNM(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2SeqNM(f)(seq, src, dst, desc)
}
object HeapSort {
  def sort[A, B, S <: Iterable[A]](f: A => B)(iterator: S, top: Int = 0,desc:Boolean=true)(implicit ord: Ordering[B]) = {
    val bf = iterator.toBuffer    
    sort_mutable(f)(bf, top,desc)
  }
  def sort_mutable[A, B, S<:mutable.Seq[A]](f: A => B)(seq: S, top: Int = 0,desc:Boolean=true)(implicit ord: Ordering[B]) = {
    sort_mutableRange(f)(seq,top,desc)()
    (if (top < seq.length && top > 0) seq.takeRight(top) else seq).reverse
  }
  private def sort_mutableRange[A, B, S<:mutable.Seq[A]](f: A => B)(seq: S, top: Int = 0, desc:Boolean=true)(from:Int=0, until:Int=seq.length)(implicit ord: Ordering[B]) = {
    buildHeapRange(f)(seq,desc)(from,until); //    

    val sublen=until-from
    val toplen = if (top <= 0 || top >= sublen) sublen  else top
    var i=until - 1
    var continue=true
    while(continue){
      swap(seq, from, i)
      if (i > (until - toplen)) {
        heapify(f)(seq, from, i, desc, from)
        i -= 1
      }else continue=false
    }
    (i,until)
  }  
  private def buildHeapRange[A, B](f: A => B)(seq: mutable.Seq[A],desc:Boolean)(from:Int,until:Int)(implicit ord: Ordering[B]) = {
   var i=from+((until-from) >>> 1) - 1
   while(i>=from){
     heapify(f)(seq, i, until,desc,from)
     i-=1
   }
  }
  def cmp1_gt [A, B](f: A => B)(l: A, r: A)(implicit ord: Ordering[B]) = ord.gt(f(l), f(r))
  def cmp1_lt [A, B](f: A => B)(l: A, r: A)(implicit ord: Ordering[B]) = ord.lt(f(l), f(r))
  def cmp_gt [A, B](f: A => B,seq: mutable.Seq[A])(l: Int, r: Int)(implicit ord: Ordering[B]) = cmp1_gt(f)(seq(l),seq(r))
  def cmp_lt [A, B](f: A => B,seq: mutable.Seq[A])(l: Int, r: Int)(implicit ord: Ordering[B]) = cmp1_lt(f)(seq(l),seq(r))
  private def heapify[A, B](f: A => B)(seq: mutable.Seq[A], startpos: Int, max: Int, desc: Boolean,off:Int)(implicit ord: Ordering[B]): Unit = {
    def gt = (l: Int, r: Int) => cmp_gt(f, seq)(l, r)
    def lt = (l: Int, r: Int) => cmp_lt(f, seq)(l, r)
    val cmp = if (desc) gt  else lt 
    var largest = 0
    var idx = startpos
    var right=0 
    var left =0
    do {
      right = off+((idx-off + 1) << 1)
      left = right - 1
      largest = if (left < max && cmp(left, idx)) 
        left 
        else 
          idx
      if (right < max && cmp(right, largest)) 
        largest = right
      if (largest != idx) {
        swap(seq, largest, idx)
        idx = largest
      } else return
    } while (true)
  }
  private def swap[A](seq: mutable.Seq[A], i: Int, j: Int) = {
    val temp = seq(i)
    seq(i) = seq(j)
    seq(j) = temp
  }
  private def swap3[A](seq: mutable.Seq[A], i: Int, j: Int,k:Int) = {
    val temp = seq(i)
    seq(i) = seq(j)
    seq(j) = seq(k)
    seq(k) = temp
  }
// private def _duplicateSeq[A](src: collection.Seq[A], srcPos: Int, dest: mutable.Seq[A], destPos: Int, length: Int): mutable.Seq[A] = {
// for (i <- 0 until length) dest(destPos + i) = src(srcPos + i)
// dest
// }
  private def _duplicateSeq[A](src: collection.Seq[A], srcPos: Int, dest: mutable.Seq[A], destPos: Int, length: Int): mutable.Seq[A] = {
    var i=0
    while(i<length){
      dest(destPos + i) = src(srcPos + i)
      i+=1
    }
    dest
  }

  def merge2Seq[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = {
    if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) {
      if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) {
        val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1))
        val cplen = math.min(srclen, dstlen)
        _duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen)
        _duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen)
      } else {
        val q = mutable.Queue[A]()
        def gt = (r: Int) => cmp1_gt(f)(seq(r), q.head)
        def lt = (r: Int) => cmp1_lt(f)(seq(r), q.head)
        val cmpdst = if (desc) gt else lt
        var (topsrc, idx) = (src._2 - 1, dst._2 - 1)
        while (idx >= dst._1) {
          q.enqueue(seq(idx))
          if (cmpdst(topsrc)) {
            seq(idx) = seq(topsrc)
            topsrc -= 1
          } else
            seq(idx) = q.dequeue()
          idx -= 1
        }
        while (idx >= dst._1){
          seq(idx) = q.dequeue()
          idx -= 1
        }
      }
    }
    dst
  }
  def merge2Seq2[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = {
    if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) {
      if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) {
        val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1))
        val cplen = math.min(srclen, dstlen)
        _duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen)
        _duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen)
      } else {
        val q = seq.slice(dst._1, dst._2)
        def gt = (l: Int,r:Int) => cmp1_gt(f)(seq(l), q(r))
        def lt = (l: Int,r:Int) => cmp1_lt(f)(seq(l), q(r))
        val cmpdst = if (desc) gt else lt
        var (topdst,topsrc, idx) = (q.length-1,src._2 - 1, dst._2 - 1)
        while (idx >= dst._1&&topsrc>=src._1) {
          if (cmpdst(topsrc,topdst)) {
            seq(idx) = seq(topsrc)
            topsrc -= 1
          } else{
            seq(idx) = q(topdst)
            topdst -= 1
          }
          idx -= 1
        }
        if(idx>=dst._1)
          _duplicateSeq(q, topdst-(idx-dst._1), seq, dst._1, idx-dst._1+1)
      }
    }
    dst
  }
  def merge2SeqNM[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = {
    if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) {
      if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) {
        val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1))
        val cplen = math.min(srclen, dstlen)
        _duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen)
        _duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen)
      } else {
        var (idx,qbf,qbt,qh)=(dst._2-1,dst._2-1,dst._2-1,dst._2-1)
        var st=src._2-1
        var swapst=()=>{}
        var swapqh=()=>{}
        def gt = (l: Int) => cmp_gt(f, seq)(l, qh)
        def lt = (l: Int) => cmp_lt(f, seq)(l, qh)
        val cmpdst = if (desc) gt else lt
        def swaptop(top: Int) = {
          val temp = seq(idx)
          seq(idx) = seq(top)
          seq(top) = temp
        }
        def getql=()=>qbf+(qh-qbf+1)%(qbt-qbf+1)
        def nextqh=()=>qbt-(qbt-qh+1)%(qbt-qbf+1)
// def moveStep(from: Int, to: Int, step: Int) =for (i <- (if (step > 0) (from to to).reverse else (from to to))) seq(i + step) = seq(i)
        def moveStep(from: Int, to: Int, step: Int) = {
          var i = if (step > 0) to else from
          def upf() = i >= from
          def dnt() = i <= to
          val (s, c) = if (step > 0) (-1, upf _) else (1, dnt _)
          while (c()) {
            seq(i + step) = seq(i)
            i += s
          }
        }
        def swapLeft(from:Int,to:Int)={
          val tmp=seq(from-1)
          moveStep(from,to,-1)
          seq(to)=tmp
        }
        def swapRight(from:Int,to:Int)={
          val tmp=seq(to+1)
          moveStep(from,to,1)
          seq(from)=tmp
        }
        def swapStTail() = {
          swaptop(st)
          val ql = getql()
          if (ql > qbf)
            if (qh - qbf > qbt - ql) {
              swap(seq, st, qbt)
              swapRight(ql, qbt - 1)
              qbf = st
            } else {
              swapLeft(qbf, qh)
              qbf = st
              qh = nextqh()
            }
          else{
            qbf=st
          }
        }
        def swapStHead() = {
          swaptop(st)
          swapst = swapStTail 
          swapqh = swapQhEnable  
          qh = st
          qbf = st
          qbt = st
        }
        def swapQhDisable() = {
          qbf -= 1
          qbt -= 1
          qh -= 1
        }
        def swapQhEnable() = {
          swaptop(qh)
          qh = nextqh()
        }
        swapst = swapStHead 
        swapqh = swapQhDisable 
        while (idx >= dst._1 && st >= src._1) {
          if (cmpdst(st)) {
            swapst()
            st -= 1
          } else 
            swapqh()
          idx -= 1
        }
        if (idx >= dst._1) {
          val ql = getql()
          _duplicateSeq(seq, ql, seq, dst._1, qbt - ql + 1)
          _duplicateSeq(seq, qbf, seq, dst._1 + qbt - ql + 1, ql - qbf)
        }
      }
    }
    dst
  }
  private val processors=Runtime.getRuntime.availableProcessors()//  cpu   
  def top_mutable_par[A, B, M <: mutable.Seq[A]](f: A => B)(seq: M, top: Int, desc: Boolean = true)(implicit ord: Ordering[B]) = {
//  cpu             
    val step = (seq.length+(processors)-1) / (processors)
//                
    val rangs = for (i <- (0 until (seq.length + step - 1) / step).par) yield { sort_mutableRange(f)(seq, top)(i * step, math.min(seq.length, (i + 1) * step)) }
    def merge = (left: (Int, Int), right: (Int, Int)) => if ((right._2 - right._1) > (left._2 - left._1)) merge2SeqNM(f)(seq, left, right, desc) else merge2SeqNM(f)(seq, right, left, desc)
//   reduce             
    val r = rangs.reduce(merge(_, _))
//      (    )
    seq.slice(r._1, r._2).reverse
  }  
  def main(args: Array[String]) {
//    
    val m = new HeapSort[Int, Int, mutable.Buffer[Int]]((w: Int) => w)
println(Array(7,11,9,17,15,21,8,30,14,0,12,15,55,2,3,18,22,23,4).aggregate(List[Int]())(m.seqop, m.combop).toString())
    val rnd=new java.util.Random()
    val l=new Array[Int](40)
    for(i<-0 until 5){
      l(i)=rnd.nextInt(100)
      }
    for(i<-5 until l.length){
      l(i)=rnd.nextInt(100)
      }

    for (i <- 0 to 0) {
      println("==============time ", i,"=================")
      val s=l.toBuffer[Int]
      println(s)
      val t1 = System.currentTimeMillis
      val r1 = m.sort_range(s, 10, true)(0,5)
      val r2 = m.sort_range(s, 10, true)(5,40)
      val t2 = System.currentTimeMillis
      printf("sort time cost:%f seconds(%d mills) used
"
, (t2 - t1) / 1024D, t2 - t1) for(i<-(r1._1 until r1._2)){ print(s(i)+",") } println(r1) for(i<-(r2._1 until r2._2)){ print(s(i)+",") } println(r2) m.merge2Seq2(s, r1, r2) for(i<-(r2._1 until r2._2).reverse){ print(s(i)+",") } println(r2) } } }

좋은 웹페이지 즐겨찾기