spark 기반 전자상거래 사용자 행동 분석 프로젝트

73416 단어 빅 데이터
1. 프로젝트 수요 (각종 기준):
- 신규 가입자 와 총 가입자 분석 신규 가입자 데이터 와 총 가입자 데이터 platform, date, browser - newinstall_user 계산 규칙: launch 이벤트 에서 uid 의 유일한 수 를 계산 합 니 다. -total_user 계산 규칙: 같은 차원, 전날 총 사용자 + 당일 추가 사용자.
- 활성 사용자 분석 activeuser 계산 규칙: 당일 모든 데이터 에서 uid 의 중복 수 를 제거 합 니 다.
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
-                 ,                          。
-             cookie    uuid                ,                     (  )。
-     ,                   ,                 。
-     (active_member)    :
    -     (        ) pageview      memberid     。
    -        pageview  ,          :               ,                   ,              java_server             ,    id。

- 신규 회원 및 총 회원 분석
-               ,      u_mid   ,        ,     launch   ,uuid     ,        ,                     id     (          :           ,              )。
-    (new_member)    :    (       )       member id,  member id         (               ),      member id           。
-         member id       ,            ,           hbase ,   id  rowkey,        。           mysql ,   id    ,        。         。
-    (total_member)    :       +                。            ,                 +                        。      total_users   。

=========================================================
- 회화 분석
-                    ,                             。        u_sd     ,             。
-            u_sd   ,               ,             。(  :               )

- Hourly 분석
 - Hourly             ,      ,       、                  。
 - hourly    hourly active user  、hourly sessions    hourly sessions length  ,             、                 。

=========================================================
- 브 라 우 저 PV 분석
-                、         ,  pv   ,pv             ,               ,      pv   ,                 ;       ,                 。    ,                         。
- pv           url   ,      ,            url       pv 。           pageview      pv ,          。

- 지역 정보 분석
-       (     platform   date),             , :    、    、    。                    、           。
-            ,          。
-         all            。
-       :  、  、  、uuid、serverTime、platform          (     ip  ),       pc               ,    pageview               。
-          ,          uuid(       )     ;         u_sd(  id)     ;               pv      ,  pv                ,    pv   。         uuid u_sd          。

- 외부 체인 정보 분석
-          ,        ,           、                 。
-       (     platform   date),             , :all、    。                    、           。           ,                ,          ,               。
-       :referrer url、uuid、u_sd、serverTime、platform          ,       pc               ,    pageview               。

=========================================================
사건 분석 - 사건 분석 우 리 는 주로 사건 의 촉발 횟수 를 분석 할 뿐 사건 의 촉발 횟수 를 보면 우 리 는 사건 전환 율 이나 사용자 가 이런 사건 의 흥미 와 불쾌 한 점 을 얻 을 수 있다.이벤트 이벤트 에서 category 와 action 그룹 을 나 눈 후의 기록 개 수 를 계산 합 니 다. 재 작업 은 포함 되 지 않 습 니 다. -단계: 1. hive 사용자 정의 함수 정의 2. hive 에서 hbase 에 대응 하 는 외부 표 만 들 기 3. hive 발걸음 작성 4. sqoop 발걸음 작성 5. 테스트
=========================================================
- 주문 분석
-                    ,          、                   ,                         。
-            hql  +sqoop              
-   :
    1. hive   hbase      
    2.     &     hive&sqoop  
    a.      udf&       
    b. hive+sqoop  
    3.         &  &    hive&sqoop  
    a.       mysql
    b.      udf&       
    c. hive+sqoop  
    4.       &  &    hive&sqoop  
    5. shell        

2. 코드 구현
1. ETL 작업 을 하고 데 이 터 를 HBase 표 에 삽입 합 니 다.
import java.util
import java.util.zip.CRC32

import com.huadian.bigdata.project.common.EventLogConstants
import com.huadian.bigdata.project.common.EventLogConstants.EventEnum
import com.huadian.bigdata.project.util.{LogParser, TimeUtil}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  *   spark    HDFS      ,  ETL  ,        HBase  
  *      ETL   HBase  
  *               ,              
  * HBase    
  *        ,             ,rowkey  
  */
object EtlToHBaseSparkV2 {

  /**
    *   rowkey
    *   :   
    *   -a.      
    *   -b.  ID +   ID+      ->CRC32  ,    Long  
    */
  def createRowKey(time: Long, u_ud: String, u_md: String, event_alias: String):String = {

    //  CRC    :        ,      Long    
    val crc32 = new CRC32()
    crc32.reset()
    if(StringUtils.isNotBlank(u_ud)){
      crc32.update(Bytes.toBytes(u_ud))
    }
    if(StringUtils.isNotBlank(u_md)){
      crc32.update(Bytes.toBytes(u_md))
    }
    if(StringUtils.isNotBlank(event_alias)){
      crc32.update(Bytes.toBytes(event_alias))
    }
    //  StringBuilder  
    val sb  = new StringBuilder()
    sb.append(time).append("_").append(crc32.getValue)

    sb.toString()
  }

  /**
    *   HBase ,      
    * @param processDate
    *              :  2019-08-06
    * @param conf
    *
    *@return
    *           event_logs20190806
    */
  def createHBaseTable(processDate: String, conf: Configuration):String = {

    val time = TimeUtil.parseLong2String(TimeUtil.parseString2Long(processDate),"yyyyMMdd")
    val tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + time
    val conf = HBaseConfiguration.create
    //System.out.println(conf);
    var conn:Connection = null
    var admin:HBaseAdmin = null

    try {
      //2.    
      conn = ConnectionFactory.createConnection(conf)
      admin =  conn.getAdmin().asInstanceOf[HBaseAdmin]

      if (admin.tableExists(tableName)) { //   
        admin.disableTable(tableName)
        //   
        admin.deleteTable(tableName)
      }
      //HTableDescriptor:   +   
      //a.     
      val desc = new HTableDescriptor(TableName.valueOf(tableName))



      //b.      
      val family = new HColumnDescriptor(EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME)
      //      
      family.setCompressionType(Compression.Algorithm.SNAPPY)
      //         
      family.setBlockCacheEnabled(false)

      desc.addFamily(family)

      admin.createTable(desc,Array(
        Bytes.toBytes("145057118"),Bytes.toBytes("145057138"),
        Bytes.toBytes("145057158"),Bytes.toBytes("145057188")
      ))
      tableName
    }catch {
        case e:Exception =>e.printStackTrace()
        tableName
    }finally {
      if(admin != null) admin.close()
    }
  }

  def main(args: Array[String]): Unit = {
    //Spark app   :      Master     
    val sparkConf = new SparkConf()
      .setAppName("EtlToHBaseSpark application")
      .setMaster("local[2]")
      //  kryo   
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    //a.      
    val eventLogRDD: RDD[String] = sc.textFile("file:///I:/4-source/HD201903/spark-learning/datas/")

    println(s"count=${eventLogRDD.count()}")
    println(eventLogRDD.first())

    //b.      
    val parseEventLogRDD: RDD[(String, util.Map[String, String])] = eventLogRDD
      //      
      .map(line =>{
       //        ,   Java  Map  
        val logInfo: util.Map[String, String] = new LogParser().handleLogParser(line)
        //      
        val eventAlias = logInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME)
        (eventAlias,logInfo)
    })
    println(parseEventLogRDD.first())

    //c.       (ImmutableBytesWritable,Put)
    //       
    val eventTypeList = List(EventEnum.LAUNCH, EventEnum.PAGEVIEW, EventEnum.CHARGEREQUEST,
      EventEnum.CHARGESUCCESS, EventEnum.CHARGEREFUND, EventEnum.EVENT)

    //   List      
    val eventTypeBroadcast = sc.broadcast(eventTypeList)

    val eventPutRDD = parseEventLogRDD
        //          eventType
        .filter(tuple =>eventTypeBroadcast.value.contains(EventEnum.valueOfAlias(tuple._1)))
        .map{
          case  (eventAlias,logInfo)=>{
            //RowKey:   _  ID
            val rowKey = createRowKey(
              //    
              TimeUtil.parseNginxServerTime2Long(
                logInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)
              ),
              //       
              logInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID),
              //  ID
              logInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID),
              //      
              eventAlias
            )

            //  Put  
            val put = new Put(Bytes.toBytes(rowKey))
            // Java   Scala  
            import scala.collection.JavaConverters._
            val logInfoScala: Map[String, String] = logInfo.asScala.toMap
            for((key,value) <- logInfoScala){
              put.addColumn(
                EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME,
                Bytes.toBytes(key),
                Bytes.toBytes(value)
              )
            }
            (new ImmutableBytesWritable(put.getRow),put)
          }
        }
    println(s"eventPutRDD Count = ${eventPutRDD.count()}")
    println(eventPutRDD.first())


    //d. RDD     HBase  
    val conf: Configuration = HBaseConfiguration.create

    /**
      *   ETL       ,         ,         HBase     
      *   
      *   event_logs+ date
      *   event_logs20190806
      */
      //   :    ,     
    val tableName = createHBaseTable(args(0),conf)

    //     OutputFormat
    conf.set("mapreduce.job.outputformat.class","org.apache.hadoop.hbase.mapreduce.TableOutputFormat")
    //      HBase    
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    //       
    conf.set("mapreduce.output.fileoutputformat.outputdir","/datas/spark/hbase/etl-output" + System.currentTimeMillis())

    //  
    eventPutRDD.saveAsNewAPIHadoopDataset(conf)

    Thread.sleep(1000000000)

    //    
    sc.stop()
  }
}

2. 신규 사용자 통계:
import java.util.Calendar

import com.huadian.bigdata.project.common.EventLogConstants
import com.huadian.bigdata.project.common.EventLogConstants.EventEnum
import com.huadian.bigdata.project.util.TimeUtil
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.filter.{CompareFilter, SingleColumnValueFilter}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark Application    
  */
object NewUserCountSparkV3 {
  //scala     ,  spark application  Driver
  def main(args: Array[String]): Unit = {
    //Spark app   :      Master     
    val sparkConf = new SparkConf()
      .setAppName("NewUserCountSparkV2 appliction")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //           
      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    //  sparkContext  :             ,   RDD   ;  jobs  
    val sc = SparkContext.getOrCreate(sparkConf)
    sc.setLogLevel("WARN")

    /**
      *  HBase     
      */
    //(1)      
    val conf: Configuration = HBaseConfiguration.create
    //(2)     
    val time = TimeUtil.parseLong2String(TimeUtil.parseString2Long(args(0)),"yyyyMMdd")
    val tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + time
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    /**
      *  HBASE      ,      
      */
    val scan = new Scan()
     //         
    val FAMILY_NAME: Array[Byte] = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME
    scan.addFamily(FAMILY_NAME)
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME))
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID))
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION))
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM))
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME))
    scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION))

    //        e_l,launch  
    scan.setFilter(new SingleColumnValueFilter(
      FAMILY_NAME,
      Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME),
      CompareFilter.CompareOp.EQUAL,
      Bytes.toBytes(EventEnum.LAUNCH.alias)
    ))
    
    conf.set(
      TableInputFormat.SCAN,
      Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)
    )
    //(3)    
    val eventLogRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
    println(s"eventLogRDD count = ${eventLogRDD.count()}") //3115
    println(eventLogRDD.first())

    //  HBase           
    val newUserRDD: RDD[(String, String, String, String, String, String)] = eventLogRDD.map{case(key,result)=>{
      //  rowKey
      val rowKey = Bytes.toString(key.get())
      //         
      val uuid = Bytes.toString(result.getValue(FAMILY_NAME,
        Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)))
      val s_time = Bytes.toString(result.getValue(FAMILY_NAME,
        Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)))
      val platformName = Bytes.toString(result.getValue(FAMILY_NAME,
        Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)))
      val platformVersion = Bytes.toString(result.getValue(FAMILY_NAME,
        Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)))
      val browserName = Bytes.toString(result.getValue(FAMILY_NAME,
        Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME)))
      val browserVersion = Bytes.toString(result.getValue(FAMILY_NAME,
        Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION)))

      //          
      (uuid,s_time,platformName,platformVersion,browserName,browserVersion)
    }}.filter(tuple => tuple._1!=null && tuple._1.trim.length>0)
    println(s"filter count ${newUserRDD.count()}") //2835
    /**
      *                      
      *     +    +     
      *           iOS:ios8   chrome:55
      */
    val dayPlatformBrowserNewUserRDD: RDD[(String, Int, String, String)] = newUserRDD.map{
      case (uuid,s_time,platformName,platformVersion,browserName,browserVersion) =>{
          //         
       val calender = Calendar.getInstance()
        calender.setTimeInMillis(TimeUtil.parseNginxServerTime2Long(s_time))
        val day = calender.get(Calendar.DAY_OF_MONTH)
        //      
        var platformDimension:String = ""
        if(StringUtils.isBlank(platformName)){
          platformDimension = "unkown:unkown"
        }else if(StringUtils.isBlank(platformVersion)){
          platformDimension = platformName+":unkown"
        }else{
          platformDimension = platformName + ":" + platformVersion
        }
        //       
        var browserDimension:String = ""
        if(StringUtils.isBlank(browserName)){
          browserDimension = "unkown:unkown"
        }else if(StringUtils.isBlank(browserVersion)){
          browserDimension = browserName+":unkown"
        }else{
          browserDimension = browserName + ":" + browserVersion
        }
        (uuid,day,platformDimension,browserDimension)
      }
    }
    //dayPlatformBrowserNewUserRDD       
    dayPlatformBrowserNewUserRDD.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

    //        
    val dayPlatformNewUserCountRDD =  dayPlatformBrowserNewUserRDD
      .map(tuple =>((tuple._2,tuple._3),1))
      .reduceByKey(_ + _)
    dayPlatformNewUserCountRDD.foreachPartition(_.foreach(println))

    //         +      
    val dayPlatformBrowserNewUserCountRDD: RDD[((Int, String, String), Int)] =  dayPlatformBrowserNewUserRDD
      .map(tuple =>((tuple._2,tuple._3,tuple._4),1))
      .reduceByKey(_ + _)
    dayPlatformBrowserNewUserCountRDD.foreachPartition(_.foreach(println))
    //        MYSQL 
    dayPlatformBrowserNewUserCountRDD
      .coalesce(1)
      .foreachPartition(iter=>{
        //  
        //2.    
        iter.foreach{case ((day,platform,browser),count)=>{
        }}
        //3.  
      })
    //       
    dayPlatformBrowserNewUserRDD.unpersist()
    //spark app    4040     
    Thread.sleep(1000000000)
    //    
    sc.stop()
  }
}

ETL 로그 데 이 터 는 HBASE 표 에 있 고 프로그램 코드 최적화 점 입 니 다.
(1) 시 계 를 만 들 때
  • 설정 표 의 데이터 압축
  • 미리 분 리 된 구역 만 들 기
  • 읽 기 표 의 데 이 터 를 캐 시 하지 않 고 cache block
  • 을 설정 합 니 다.
    (2) spark 프로그램의 최적화
    . filter (tuple = > eventTypeList. contains (EventEnum. valueOfAlias (tuple. 1)) eventTypeList 는 Driver 안에 있 고 filter 는 Executor 에서 task 가 실 행 됩 니 다. RDD 에 각각 3 개의 파 티 션 이 있 으 면 eventTypeList 는 실제 개발 에 3 부 를 저장 해 야 합 니 다. 하루 에 처리 하 는 데 이 터 는 수 십 개의 GB 이 고 파 티 션 이 많 을 수 있 습 니 다.하나의 데이터 베 이 스 는 하나의 파 티 션 에 대응 하고 하나의 파 티 션 은 하나의 Task 에 대응 합 니 다. 1000 개의 파 티 션 이 있 으 면 이벤트 TypeList1M 이 소모 되면 1GB 를 소모 하여 하나의 executor 를 고려 하여 한 부 를 저장 할 수 있 습 니 다.10 개의 executor 가 10m 를 저장 하면 좋 겠 습 니 다. Spark supports two types of shared variables spark 는 2 가지 방식 변 수 를 제공 합 니 다. 공유 broadcast variables: 방송 변수 whichcan be used to cache a value in memory on all nodes,라디오 변 수 를 사용 하여 집합 분 류 를 방송 합 니 다. 모든 executor 에 데 이 터 를 보 냅 니 다. accumulators: 누적 기 which are variables that are only "added" to, such as counters and sums.
    (3) HFileOuputFile 사용
    HBASE 표 에 데 이 터 를 저장 할 때:
  • put 방식 putData - > WAL - > MemStore - > StoreFile (HFile)
  • Hfile 방식 날짜 – > Hfile - > 로드 테이블
  • 좋은 웹페이지 즐겨찾기