SparkSql 프로젝트 실전
저희 의 이번 Spark - sql 작업 에서 모든 데 이 터 는 Hive 에서 나 왔 습 니 다.
먼저 Hive 에 표를 만 들 고 데 이 터 를 가 져 옵 니 다.
모두 3 장의 표: 사용자 행동 표 1 장, 도시 표 1 장, 제품 표 1 장
CREATE TABLE `user_visit_action`(
`date` string,
`user_id` bigint,
`session_id` string,
`page_id` bigint,
`action_time` string,
`search_keyword` string,
`click_category_id` bigint,
`click_product_id` bigint,
`order_category_ids` string,
`order_product_ids` string,
`pay_category_ids` string,
`pay_product_ids` string,
`city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/datas/user_visit_action.txt' into table sparkpractice.user_visit_action;
CREATE TABLE `product_info`(
`product_id` bigint,
`product_name` string,
`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/datas/product_info.txt' into table sparkpractice.product_info;
CREATE TABLE `city_info`(
`city_id` bigint,
`city_name` string,
`area` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/datas/city_info.txt' into table sparkpractice.city_info;
제2 장 수요 1: 각 지역 인기 상품 Top 3
2.1 수요 안내
이곳 의 인기 상품 은 클릭 수의 차원 에서 볼 수 있다.
각 지역 의 3 대 인기 상품 을 계산 하고 각 상품 이 주요 도시 에서 의 분포 비율 을 비고 하여 두 도시 가 다른 것 으로 표시 하 는 것 을 초과 한다.
예 를 들 면:
지역.
상품 명
클릭 횟수
도시 비고
화북
상품 A
100000
북경 21.2%, 천진 13.2%, 기타 65.6%
화북
상품 P
80200
북경 63.0%, 태원 10%, 기타 27.0%
화북
상품 M
40000
북경 63.0%, 태원 10%, 기타 27.0%
동북
상품 J
92000
대련 28%, 랴오닝 17.0%, 기타 55.0%
sql 을 사용 하여 완성 합 니 다. 복잡 한 수요 에 부 딪 히 면 udf 또는 udaf 를 사용 할 수 있 습 니 다.
udaf 함수 정의
class AreaClickUDAF extends UserDefinedAggregateFunction {
// : String
override def inputSchema: StructType = {
StructType(StructField("city_name", StringType) :: Nil)
// StructType(Array(StructField("city_name", StringType)))
}
// : ->1000, ->5000 Map, 1000/?
override def bufferSchema: StructType = {
// MapType(StringType, LongType) map key value
StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil)
}
// " 21.2%, 13.2%, 65.6%" String
override def dataType: DataType = StringType
// .
override def deterministic: Boolean = true
//
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// map
buffer(0) = Map[String, Long]()
//
buffer(1) = 0L
}
// Map[ , ]
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// , key map , +1, , 0+1
val cityName = input.getString(0)
// val map: collection.Map[String, Long] = buffer.getMap[String, Long](0)
val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L))
// , +1
buffer(1) = buffer.getLong(1) + 1L
}
//
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map1 = buffer1.getAs[Map[String, Long]](0)
val map2 = buffer2.getAs[Map[String, Long]](0)
// map1 map2 , buffer1
buffer1(0) = map1.foldLeft(map2) {
case (map, (k, v)) =>
map + (k -> (map.getOrElse(k, 0L) + v))
}
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// . " 21.2%, 13.2%, 65.6%"
override def evaluate(buffer: Row): Any = {
val cityCountMap = buffer.getAs[Map[String, Long]](0)
val totalCount = buffer.getLong(1)
var citysRatio: List[CityRemark] = cityCountMap.toList.sortBy(-_._2).take(2).map {
case (cityName, count) => {
CityRemark(cityName, count.toDouble / totalCount)
}
}
// 2
if (cityCountMap.size > 2) {
citysRatio = citysRatio :+ CityRemark(" ", citysRatio.foldLeft(1D)(_ - _.cityRatio))
}
citysRatio.mkString(", ")
}
}
case class CityRemark(cityName: String, cityRatio: Double) {
val formatter = new DecimalFormat("0.00%")
override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
}
object AreaClickApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("AreaClickApp")
.enableHiveSupport()
.getOrCreate()
spark.sql("use sparkpractice")
// 0
spark.udf.register("city_remark", new AreaClickUDAF)
// 1. ,
spark.sql(
"""
|select
| c.*,
| v.click_product_id,
| p.product_name
|from user_visit_action v join city_info c join product_info p on v.city_id=c.city_id and v.click_product_id=p.product_id
|where click_product_id>-1
""".stripMargin).createOrReplaceTempView("t1")
// 2. ,
spark.sql(
"""
|select
| t1.area,
| t1.product_name,
| count(*) click_count,
| city_remark(t1.city_name)
|from t1
|group by t1.area, t1.product_name
""".stripMargin).createOrReplaceTempView("t2")
// 3.
spark.sql(
"""
|select
| *,
| rank() over(partition by t2.area order by t2.click_count desc) rank
|from t2
""".stripMargin).createOrReplaceTempView("t3")
// 4. top3
spark.sql(
"""
|select
| *
|from t3
|where rank<=3
""".stripMargin).show
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 세 가지 방식 으로 데 이 터 를 조회 합 니 다.1. 표 한 장 씩 데이터: studentscores.txt 필드: 학급 번호, 학급 이름, 입학 날짜, 소속 학과 중국어 이름 세 가지 방식 을 사용 하 다 제 1 종: 열 이름 을 지정 하여 Schema 추가 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.