SparkSql 프로젝트 실전

6919 단어 SparkSQL빅 데이터
제1장 준비 데이터
저희 의 이번 Spark - sql 작업 에서 모든 데 이 터 는 Hive 에서 나 왔 습 니 다.
먼저 Hive 에 표를 만 들 고 데 이 터 를 가 져 옵 니 다.
모두 3 장의 표: 사용자 행동 표 1 장, 도시 표 1 장, 제품 표 1 장
CREATE TABLE `user_visit_action`(
  `date` string,
  `user_id` bigint,
  `session_id` string,
  `page_id` bigint,
  `action_time` string,
  `search_keyword` string,
  `click_category_id` bigint,
  `click_product_id` bigint,
  `order_category_ids` string,
  `order_product_ids` string,
  `pay_category_ids` string,
  `pay_product_ids` string,
  `city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/datas/user_visit_action.txt' into table sparkpractice.user_visit_action;

CREATE TABLE `product_info`(
  `product_id` bigint,
  `product_name` string,
  `extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/datas/product_info.txt' into table sparkpractice.product_info;

CREATE TABLE `city_info`(
  `city_id` bigint,
  `city_name` string,
  `area` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/datas/city_info.txt' into table sparkpractice.city_info;

제2 장 수요 1: 각 지역 인기 상품 Top 3
2.1 수요 안내
이곳 의 인기 상품 은 클릭 수의 차원 에서 볼 수 있다.
각 지역 의 3 대 인기 상품 을 계산 하고 각 상품 이 주요 도시 에서 의 분포 비율 을 비고 하여 두 도시 가 다른 것 으로 표시 하 는 것 을 초과 한다.
예 를 들 면:
지역.
상품 명
클릭 횟수
도시 비고
화북
상품 A
100000
북경 21.2%, 천진 13.2%, 기타 65.6%
화북
상품 P
80200
북경 63.0%, 태원 10%, 기타 27.0%
화북
상품 M
40000
북경 63.0%, 태원 10%, 기타 27.0%
동북
상품 J
92000
대련 28%, 랴오닝 17.0%, 기타 55.0%
sql 을 사용 하여 완성 합 니 다. 복잡 한 수요 에 부 딪 히 면 udf 또는 udaf 를 사용 할 수 있 습 니 다.
  • 모든 클릭 기록 을 조회 하고 cityinfo 표 와 연결 하여 각 도시 가 있 는 지역 을 얻 습 니 다. Productinfo 표 연결 제품 이름 얻 기
  • 지역 과 상품 id 로 나 누 어 각 상품 이 각 지역 에서 의 총 클릭 횟수
  • 를 통계 한다.
  • 지역 별 클릭 수 내림차 순 정렬
  • 3 등 만 가 져 오고 결 과 를 데이터베이스 에 저장 합 니 다
  • 도시 비고 사용자 정의 UDAF 함수
  • 구체 적 업무 실현
    udaf 함수 정의
    class AreaClickUDAF extends UserDefinedAggregateFunction {
        //        :      String
        override def inputSchema: StructType = {
            StructType(StructField("city_name", StringType) :: Nil)
            //        StructType(Array(StructField("city_name", StringType)))
        }
        
        //         :   ->1000,   ->5000  Map,         1000/?
        override def bufferSchema: StructType = {
            // MapType(StringType, LongType)       map key    value   
            StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil)
        }
        
        //          "  21.2%,  13.2%,  65.6%"  String
        override def dataType: DataType = StringType
        
        //                .
        override def deterministic: Boolean = true
        
        //         
        override def initialize(buffer: MutableAggregationBuffer): Unit = {
            //   map  
            buffer(0) = Map[String, Long]()
            //         
            buffer(1) = 0L
        }
        
        //       Map[   ,    ]
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
            //        ,        key   map     ,            +1,      ,    0+1
            val cityName = input.getString(0)
            //        val map: collection.Map[String, Long] = buffer.getMap[String, Long](0)
            val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
            buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L))
            //       ,        +1
            buffer(1) = buffer.getLong(1) + 1L
        }
        
        //       
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
            val map1 = buffer1.getAs[Map[String, Long]](0)
            val map2 = buffer2.getAs[Map[String, Long]](0)
            
            //  map1     map2    ,      buffer1
            buffer1(0) = map1.foldLeft(map2) {
                case (map, (k, v)) =>
                    map + (k -> (map.getOrElse(k, 0L) + v))
            }
            
            buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
        }
        
        //      . "  21.2%,  13.2%,  65.6%"
        override def evaluate(buffer: Row): Any = {
            val cityCountMap = buffer.getAs[Map[String, Long]](0)
            val totalCount = buffer.getLong(1)
            
            var citysRatio: List[CityRemark] = cityCountMap.toList.sortBy(-_._2).take(2).map {
                case (cityName, count) => {
                    CityRemark(cityName, count.toDouble / totalCount)
                }
            }
            //          2     
            if (cityCountMap.size > 2) {
                citysRatio = citysRatio :+ CityRemark("  ", citysRatio.foldLeft(1D)(_ - _.cityRatio))
            }
            citysRatio.mkString(", ")
        }
    }
    
    
    case class CityRemark(cityName: String, cityRatio: Double) {
        val formatter = new DecimalFormat("0.00%")
        
        override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
    }
    
    object AreaClickApp {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[2]")
                .appName("AreaClickApp")
                .enableHiveSupport()
                .getOrCreate()
            spark.sql("use sparkpractice")
            // 0          
            spark.udf.register("city_remark", new AreaClickUDAF)
            // 1.           ,            
            spark.sql(
                """
                  |select
                  |    c.*,
                  |    v.click_product_id,
                  |    p.product_name
                  |from user_visit_action v join city_info c join product_info p on v.city_id=c.city_id and v.click_product_id=p.product_id
                  |where click_product_id>-1
                """.stripMargin).createOrReplaceTempView("t1")
            
            // 2.       ,         
            spark.sql(
                """
                  |select
                  |    t1.area,
                  |    t1.product_name,
                  |    count(*) click_count,
                  |    city_remark(t1.city_name)
                  |from t1
                  |group by t1.area, t1.product_name
                """.stripMargin).createOrReplaceTempView("t2")
            
            // 3.                   
            spark.sql(
                """
                  |select
                  |    *,
                  |    rank() over(partition by t2.area order by t2.click_count desc) rank
                  |from t2
                """.stripMargin).createOrReplaceTempView("t3")
            
            // 4.      top3
            
            spark.sql(
                """
                  |select
                  |    *
                  |from t3
                  |where rank<=3
                """.stripMargin).show
            
            
        }
    }
    

    좋은 웹페이지 즐겨찾기