spark 기반 전자상거래 사용자 행동 분석 프로젝트
73416 단어 빅 데이터
- 신규 가입자 와 총 가입자 분석 신규 가입자 데이터 와 총 가입자 데이터 platform, date, browser - newinstall_user 계산 규칙: launch 이벤트 에서 uid 의 유일한 수 를 계산 합 니 다. -total_user 계산 규칙: 같은 차원, 전날 총 사용자 + 당일 추가 사용자.
- 활성 사용자 분석 activeuser 계산 규칙: 당일 모든 데이터 에서 uid 의 중복 수 를 제거 합 니 다.
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
- , 。
- cookie uuid , ( )。
- , , 。
- (active_member) :
- ( ) pageview memberid 。
- pageview , : , , java_server , id。
- 신규 회원 및 총 회원 분석
- , u_mid , , launch ,uuid , , id ( : , )。
- (new_member) : ( ) member id, member id ( ), member id 。
- member id , , hbase , id rowkey, 。 mysql , id , 。 。
- (total_member) : + 。 , + 。 total_users 。
=========================================================
- 회화 분석
- , 。 u_sd , 。
- u_sd , , 。( : )
- Hourly 분석
- Hourly , , 、 。
- hourly hourly active user 、hourly sessions hourly sessions length , 、 。
=========================================================
- 브 라 우 저 PV 분석
- 、 , pv ,pv , , pv , ; , 。 , 。
- pv url , , url pv 。 pageview pv , 。
- 지역 정보 분석
- ( platform date), , : 、 、 。 、 。
- , 。
- all 。
- : 、 、 、uuid、serverTime、platform ( ip ), pc , pageview 。
- , uuid( ) ; u_sd( id) ; pv , pv , pv 。 uuid u_sd 。
- 외부 체인 정보 분석
- , , 、 。
- ( platform date), , :all、 。 、 。 , , , 。
- :referrer url、uuid、u_sd、serverTime、platform , pc , pageview 。
=========================================================
사건 분석 - 사건 분석 우 리 는 주로 사건 의 촉발 횟수 를 분석 할 뿐 사건 의 촉발 횟수 를 보면 우 리 는 사건 전환 율 이나 사용자 가 이런 사건 의 흥미 와 불쾌 한 점 을 얻 을 수 있다.이벤트 이벤트 에서 category 와 action 그룹 을 나 눈 후의 기록 개 수 를 계산 합 니 다. 재 작업 은 포함 되 지 않 습 니 다. -단계: 1. hive 사용자 정의 함수 정의 2. hive 에서 hbase 에 대응 하 는 외부 표 만 들 기 3. hive 발걸음 작성 4. sqoop 발걸음 작성 5. 테스트
=========================================================
- 주문 분석
- , 、 , 。
- hql +sqoop
- :
1. hive hbase
2. & hive&sqoop
a. udf&
b. hive+sqoop
3. & & hive&sqoop
a. mysql
b. udf&
c. hive+sqoop
4. & & hive&sqoop
5. shell
2. 코드 구현
1. ETL 작업 을 하고 데 이 터 를 HBase 표 에 삽입 합 니 다.
import java.util
import java.util.zip.CRC32
import com.huadian.bigdata.project.common.EventLogConstants
import com.huadian.bigdata.project.common.EventLogConstants.EventEnum
import com.huadian.bigdata.project.util.{LogParser, TimeUtil}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* spark HDFS , ETL , HBase
* ETL HBase
* ,
* HBase
* , ,rowkey
*/
object EtlToHBaseSparkV2 {
/**
* rowkey
* :
* -a.
* -b. ID + ID+ ->CRC32 , Long
*/
def createRowKey(time: Long, u_ud: String, u_md: String, event_alias: String):String = {
// CRC : , Long
val crc32 = new CRC32()
crc32.reset()
if(StringUtils.isNotBlank(u_ud)){
crc32.update(Bytes.toBytes(u_ud))
}
if(StringUtils.isNotBlank(u_md)){
crc32.update(Bytes.toBytes(u_md))
}
if(StringUtils.isNotBlank(event_alias)){
crc32.update(Bytes.toBytes(event_alias))
}
// StringBuilder
val sb = new StringBuilder()
sb.append(time).append("_").append(crc32.getValue)
sb.toString()
}
/**
* HBase ,
* @param processDate
* : 2019-08-06
* @param conf
*
*@return
* event_logs20190806
*/
def createHBaseTable(processDate: String, conf: Configuration):String = {
val time = TimeUtil.parseLong2String(TimeUtil.parseString2Long(processDate),"yyyyMMdd")
val tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + time
val conf = HBaseConfiguration.create
//System.out.println(conf);
var conn:Connection = null
var admin:HBaseAdmin = null
try {
//2.
conn = ConnectionFactory.createConnection(conf)
admin = conn.getAdmin().asInstanceOf[HBaseAdmin]
if (admin.tableExists(tableName)) { //
admin.disableTable(tableName)
//
admin.deleteTable(tableName)
}
//HTableDescriptor: +
//a.
val desc = new HTableDescriptor(TableName.valueOf(tableName))
//b.
val family = new HColumnDescriptor(EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME)
//
family.setCompressionType(Compression.Algorithm.SNAPPY)
//
family.setBlockCacheEnabled(false)
desc.addFamily(family)
admin.createTable(desc,Array(
Bytes.toBytes("145057118"),Bytes.toBytes("145057138"),
Bytes.toBytes("145057158"),Bytes.toBytes("145057188")
))
tableName
}catch {
case e:Exception =>e.printStackTrace()
tableName
}finally {
if(admin != null) admin.close()
}
}
def main(args: Array[String]): Unit = {
//Spark app : Master
val sparkConf = new SparkConf()
.setAppName("EtlToHBaseSpark application")
.setMaster("local[2]")
// kryo
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//a.
val eventLogRDD: RDD[String] = sc.textFile("file:///I:/4-source/HD201903/spark-learning/datas/")
println(s"count=${eventLogRDD.count()}")
println(eventLogRDD.first())
//b.
val parseEventLogRDD: RDD[(String, util.Map[String, String])] = eventLogRDD
//
.map(line =>{
// , Java Map
val logInfo: util.Map[String, String] = new LogParser().handleLogParser(line)
//
val eventAlias = logInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME)
(eventAlias,logInfo)
})
println(parseEventLogRDD.first())
//c. (ImmutableBytesWritable,Put)
//
val eventTypeList = List(EventEnum.LAUNCH, EventEnum.PAGEVIEW, EventEnum.CHARGEREQUEST,
EventEnum.CHARGESUCCESS, EventEnum.CHARGEREFUND, EventEnum.EVENT)
// List
val eventTypeBroadcast = sc.broadcast(eventTypeList)
val eventPutRDD = parseEventLogRDD
// eventType
.filter(tuple =>eventTypeBroadcast.value.contains(EventEnum.valueOfAlias(tuple._1)))
.map{
case (eventAlias,logInfo)=>{
//RowKey: _ ID
val rowKey = createRowKey(
//
TimeUtil.parseNginxServerTime2Long(
logInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)
),
//
logInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID),
// ID
logInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID),
//
eventAlias
)
// Put
val put = new Put(Bytes.toBytes(rowKey))
// Java Scala
import scala.collection.JavaConverters._
val logInfoScala: Map[String, String] = logInfo.asScala.toMap
for((key,value) <- logInfoScala){
put.addColumn(
EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME,
Bytes.toBytes(key),
Bytes.toBytes(value)
)
}
(new ImmutableBytesWritable(put.getRow),put)
}
}
println(s"eventPutRDD Count = ${eventPutRDD.count()}")
println(eventPutRDD.first())
//d. RDD HBase
val conf: Configuration = HBaseConfiguration.create
/**
* ETL , , HBase
*
* event_logs+ date
* event_logs20190806
*/
// : ,
val tableName = createHBaseTable(args(0),conf)
// OutputFormat
conf.set("mapreduce.job.outputformat.class","org.apache.hadoop.hbase.mapreduce.TableOutputFormat")
// HBase
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
//
conf.set("mapreduce.output.fileoutputformat.outputdir","/datas/spark/hbase/etl-output" + System.currentTimeMillis())
//
eventPutRDD.saveAsNewAPIHadoopDataset(conf)
Thread.sleep(1000000000)
//
sc.stop()
}
}
2. 신규 사용자 통계:
import java.util.Calendar
import com.huadian.bigdata.project.common.EventLogConstants
import com.huadian.bigdata.project.common.EventLogConstants.EventEnum
import com.huadian.bigdata.project.util.TimeUtil
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.filter.{CompareFilter, SingleColumnValueFilter}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark Application
*/
object NewUserCountSparkV3 {
//scala , spark application Driver
def main(args: Array[String]): Unit = {
//Spark app : Master
val sparkConf = new SparkConf()
.setAppName("NewUserCountSparkV2 appliction")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
// sparkContext : , RDD ; jobs
val sc = SparkContext.getOrCreate(sparkConf)
sc.setLogLevel("WARN")
/**
* HBase
*/
//(1)
val conf: Configuration = HBaseConfiguration.create
//(2)
val time = TimeUtil.parseLong2String(TimeUtil.parseString2Long(args(0)),"yyyyMMdd")
val tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + time
conf.set(TableInputFormat.INPUT_TABLE, tableName)
/**
* HBASE ,
*/
val scan = new Scan()
//
val FAMILY_NAME: Array[Byte] = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME
scan.addFamily(FAMILY_NAME)
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME))
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID))
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION))
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM))
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME))
scan.addColumn(FAMILY_NAME, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION))
// e_l,launch
scan.setFilter(new SingleColumnValueFilter(
FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME),
CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(EventEnum.LAUNCH.alias)
))
conf.set(
TableInputFormat.SCAN,
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)
)
//(3)
val eventLogRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
println(s"eventLogRDD count = ${eventLogRDD.count()}") //3115
println(eventLogRDD.first())
// HBase
val newUserRDD: RDD[(String, String, String, String, String, String)] = eventLogRDD.map{case(key,result)=>{
// rowKey
val rowKey = Bytes.toString(key.get())
//
val uuid = Bytes.toString(result.getValue(FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)))
val s_time = Bytes.toString(result.getValue(FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)))
val platformName = Bytes.toString(result.getValue(FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)))
val platformVersion = Bytes.toString(result.getValue(FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)))
val browserName = Bytes.toString(result.getValue(FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME)))
val browserVersion = Bytes.toString(result.getValue(FAMILY_NAME,
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION)))
//
(uuid,s_time,platformName,platformVersion,browserName,browserVersion)
}}.filter(tuple => tuple._1!=null && tuple._1.trim.length>0)
println(s"filter count ${newUserRDD.count()}") //2835
/**
*
* + +
* iOS:ios8 chrome:55
*/
val dayPlatformBrowserNewUserRDD: RDD[(String, Int, String, String)] = newUserRDD.map{
case (uuid,s_time,platformName,platformVersion,browserName,browserVersion) =>{
//
val calender = Calendar.getInstance()
calender.setTimeInMillis(TimeUtil.parseNginxServerTime2Long(s_time))
val day = calender.get(Calendar.DAY_OF_MONTH)
//
var platformDimension:String = ""
if(StringUtils.isBlank(platformName)){
platformDimension = "unkown:unkown"
}else if(StringUtils.isBlank(platformVersion)){
platformDimension = platformName+":unkown"
}else{
platformDimension = platformName + ":" + platformVersion
}
//
var browserDimension:String = ""
if(StringUtils.isBlank(browserName)){
browserDimension = "unkown:unkown"
}else if(StringUtils.isBlank(browserVersion)){
browserDimension = browserName+":unkown"
}else{
browserDimension = browserName + ":" + browserVersion
}
(uuid,day,platformDimension,browserDimension)
}
}
//dayPlatformBrowserNewUserRDD
dayPlatformBrowserNewUserRDD.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
//
val dayPlatformNewUserCountRDD = dayPlatformBrowserNewUserRDD
.map(tuple =>((tuple._2,tuple._3),1))
.reduceByKey(_ + _)
dayPlatformNewUserCountRDD.foreachPartition(_.foreach(println))
// +
val dayPlatformBrowserNewUserCountRDD: RDD[((Int, String, String), Int)] = dayPlatformBrowserNewUserRDD
.map(tuple =>((tuple._2,tuple._3,tuple._4),1))
.reduceByKey(_ + _)
dayPlatformBrowserNewUserCountRDD.foreachPartition(_.foreach(println))
// MYSQL
dayPlatformBrowserNewUserCountRDD
.coalesce(1)
.foreachPartition(iter=>{
//
//2.
iter.foreach{case ((day,platform,browser),count)=>{
}}
//3.
})
//
dayPlatformBrowserNewUserRDD.unpersist()
//spark app 4040
Thread.sleep(1000000000)
//
sc.stop()
}
}
ETL 로그 데 이 터 는 HBASE 표 에 있 고 프로그램 코드 최적화 점 입 니 다.
(1) 시 계 를 만 들 때
(2) spark 프로그램의 최적화
. filter (tuple = > eventTypeList. contains (EventEnum. valueOfAlias (tuple. 1)) eventTypeList 는 Driver 안에 있 고 filter 는 Executor 에서 task 가 실 행 됩 니 다. RDD 에 각각 3 개의 파 티 션 이 있 으 면 eventTypeList 는 실제 개발 에 3 부 를 저장 해 야 합 니 다. 하루 에 처리 하 는 데 이 터 는 수 십 개의 GB 이 고 파 티 션 이 많 을 수 있 습 니 다.하나의 데이터 베 이 스 는 하나의 파 티 션 에 대응 하고 하나의 파 티 션 은 하나의 Task 에 대응 합 니 다. 1000 개의 파 티 션 이 있 으 면 이벤트 TypeList1M 이 소모 되면 1GB 를 소모 하여 하나의 executor 를 고려 하여 한 부 를 저장 할 수 있 습 니 다.10 개의 executor 가 10m 를 저장 하면 좋 겠 습 니 다. Spark supports two types of shared variables spark 는 2 가지 방식 변 수 를 제공 합 니 다. 공유 broadcast variables: 방송 변수 whichcan be used to cache a value in memory on all nodes,라디오 변 수 를 사용 하여 집합 분 류 를 방송 합 니 다. 모든 executor 에 데 이 터 를 보 냅 니 다. accumulators: 누적 기 which are variables that are only "added" to, such as counters and sums.
(3) HFileOuputFile 사용
HBASE 표 에 데 이 터 를 저장 할 때:
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.