SparkSQL 과 SparkCore 가 지 표를 냅 니 다.
pom.xml
xml version="1.0" encoding="UTF-8"?>
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
job2
JobNew
1.0-SNAPSHOT
log4j
log4j
1.2.17
mysql
mysql-connector-java
5.1.31
org.apache.spark
spark-graphx_2.10
1.6.0
com.typesafe
config
1.3.1
redis.clients
jedis
2.9.0
org.apache.hbase
hbase
1.1.1
org.apache.hbase
hbase-common
1.1.1
org.apache.hbase
hbase-server
1.1.1
org.apache.hbase
hbase-client
1.1.1
com.google.code.gson
gson
2.8.4
ch.hsr
geohash
1.3.0
org.scalikejdbc
scalikejdbc-core_2.10
2.5.0
org.scalikejdbc
scalikejdbc-config_2.10
2.5.0
org.apache.spark
spark-sql_2.10
1.6.0
com.alibaba
fastjson
1.2.36
src/main/scala
net.alchim31.maven
scala-maven-plugin
3.1.3
compile
testCompile
-make:transitive
-dependencyfile
${project.build.directory}/.scala_dependencies
org.apache.maven.plugins
maven-surefire-plugin
2.13
false
true
**/*Test.*
**/*Suite.*
SparkSQL 방식
package com.devicetype
import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
object Devicetype {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]").set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
val file = sQLContext.read.parquet("D:\\Job1\\parquet5\\part-r-00000-dcccc879-86a0-47d5-91e1-83636cd561d0.gz.parquet")
sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
file.registerTempTable("devicetype")
//devicetype
val df = sQLContext.sql(
"""
|select case when devicetype = 1 then " " when devicetype = 2 then " " else " " end devicetype,
|sum(case when requestmode = 1 and processnode >=1 then 1 else 0 end) ysrequest,
|sum(case when requestmode = 1 and processnode >=2 then 1 else 0 end) yxrequest,
|sum(case when requestmode = 1 and processnode =3 then 1 else 0 end) adrequest,
|sum(case when iseffective = 1 and isbilling =1 and isbid = 1 then 1 else 0 end) cybid,
|sum(case when iseffective = 1 and isbilling =1 and iswin = 1 and adorderid != 0 then 1 else 0 end) cybidsuccees,
|sum(case when requestmode = 2 and iseffective =1 then 1 else 0 end) shows,
|sum(case when requestmode = 3 and iseffective =1 then 1 else 0 end) clicks,
|sum(case when iseffective = 1 and isbilling =1 and iswin = 1 then winprice/1000 else 0 end) dspcost,
|sum(case when iseffective = 1 and isbilling =1 and iswin = 1 then adpayment/1000 else 0 end) dsppay
|from devicetype group by devicetype
""".stripMargin)
val load = ConfigFactory.load()
val properties = new Properties()
properties.setProperty("user",load.getString("jdbc.user"))
properties.setProperty("password",load.getString("jdbc.password"))
//
df.write.mode(SaveMode.Append).jdbc(load.getString("jdbc.url"),"devicetype1",properties)
}
}
/*************************************************/
SparkCore
package com.driver
import com.utils.RptUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
object DriverData_SparkCore {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]").set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
//
sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
val file: DataFrame = sQLContext.read.parquet("D:\\Job1\\parquet5\\part-r-00000-dcccc879-86a0-47d5-91e1-83636cd561d0.gz.parquet")
file.map(row=>{
// , ,
val requestmode = row.getAs[Int]("requestmode")
val processnode = row.getAs[Int]("processnode")
// , ,
val iseffective = row.getAs[Int]("iseffective")
val isbilling = row.getAs[Int]("isbilling")
val isbid = row.getAs[Int]("isbid")
val iswin = row.getAs[Int]("iswin")
val adorderid = row.getAs[Int]("adorderid")
val winprice = row.getAs[Double]("winprice")
val ad = row.getAs[Double]("adpayment")
// , , ,
val reqlist = RptUtils.req(requestmode,processnode)
// , ,
val adlist = RptUtils.addap(iseffective,isbilling,isbid,iswin,adorderid,winprice,ad)
//
val adCountlist = RptUtils.Counts(requestmode,iseffective)
//
(row.getAs[String]("ispname"),
reqlist ++ adlist ++ adCountlist
)
}).reduceByKey((list1,list2)=>{
// list1(0,1,1,0) list2(1,1,1,1) zip((0,1),(1,1),(1,1),(0,1))
list1.zip(list2).map(t=>t._1+t._2)
})//
.map(t=>t._1+" , "+t._2.mkString(","))
// hdfs
.saveAsTextFile("D:\\Job1\\SparkCore")
}
}
package com.utils
object RptUtils {
def req(reqMode:Int,proMode:Int):List[Double]={
if(reqMode == 1 && proMode ==1){
//
//
//
List[Double](1,0,0)
}else if(reqMode == 1 && proMode ==2){
List[Double](1,1,0)
}else if(reqMode == 1 && proMode ==3){
List[Double](1,1,1)
}else{
List[Double](0,0,0)
}
}
def addap(iseffective:Int,isbilling:Int,
isbid:Int,iswin:Int,adorderid:Int,winprice:Double,ad:Double):List[Double]={
if(iseffective==1 && isbilling==1 && isbid ==1){
if(iseffective==1 && isbilling==1 && iswin ==1 && adorderid !=0){
List[Double](1,1,winprice/1000.0,ad/1000.0)
}else{
List[Double](1,0,0,0)
}
}else{
List[Double](0,0,0,0)
}
}
def Counts(requestmode:Int,iseffective:Int): List[Double] ={
if(requestmode ==2 && iseffective ==1){
List[Double](1,0)
}else if(requestmode ==3 && iseffective ==1){
List[Double](0,1)
}else{
List[Double](0,0)
}
}
}
다음으로 전송:https://www.cnblogs.com/VIP8/p/10520002.html
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.