sparksql 최적화, 성능 3 - 4 배 향상
15855 단어 spark
val bidDetailDf=ss.sql(
s"""
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(
select app,day,hour,adx,os,osv,country,impType,count(1) as cnt,
0 as request,
0 as response,
0 as bid,
sum(case when filter='timeout' then 1 else 0 end) as timeout,
sum(case when filter='status' then 1 else 0 end) as status,
sum(case when filter='country' then 1 else 0 end) as ccount,
sum(case when filter='remarketing' then 1 else 0 end) as remarketing,
sum(case when filter='banner' then 1 else 0 end) as banner,
sum(case when filter='video' then 1 else 0 end) as video,
sum(case when filter='mediatype' then 1 else 0 end) as mediatype,
sum(case when filter='publisher' then 1 else 0 end) as publisher,
sum(case when filter='ifa' then 1 else 0 end) as ifa,
sum(case when filter='qps' then 1 else 0 end) as qps,
sum(case when filter='budget' then 1 else 0 end) as budget,
sum(case when filter='cat' then 1 else 0 end) as cat,
sum(case when filter='error' then 1 else 0 end) as error,
sum(case when filter='bidfloor' then 1 else 0 end) as bidfloor,
sum(case when filter='seatbid' then 1 else 0 end) as seatbid,
sum(case when filter='admerror' then 1 else 0 end) as admerror
from
(
select rr.app,day,hour,adx,os,osv,country,impType,rr.filter
from (
select day,hour,adx,os,osv,country,impType,split(regexp_replace(regexp_extract(allapps,'^\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s') pp
lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'app','filter') rr as app, filter
) t where app<>'NULL' group by app,day,hour,adx,os,osv,country,impType
union all
select app,day,hour,adx,os,osv,country,impType, 0 as cnt, count(1) as request, 0 as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
lateral view explode(split(regexp_replace(appsitemfinal,'\\\\[|\\\\]|"',''), ',')) aa as app
where day='%s' and app <>'NULL' and app is not null and app <> ''
group by app,day,hour,adx,os,osv,country,impType
union all
select appName as app,day,hour,adx,os,osv,country,impType,0 as cnt, 0 as request, sum(case when httpStatus='200' then 1 else 0 end) as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
(
select rr.appName,day,hour,adx,os,osv,country,impType, rr.httpStatus
from (
select day,hour,adx,os,osv,country,impType, split(regexp_replace(regexp_extract(appitemresponse,'\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s'
) pp lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'appName','httpStatus') rr as appName, httpStatus
) t where appName<>'NULL' group by appName,day,hour,adx,os,osv,country,impType
union all
select appname as app,day,hour,adx,os,osv,country,impType, 0 as cnt ,0 as request, 0 as response, count(1) as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s' and trim(appname)<>''
group by appname,day,hour,adx,os,osv,country,impType
) t where app<>'' group by app,day,hour,adx,os,osv,country,impType
""".format(yesterDay,yesterDay,yesterDay,yesterDay)).persist(StorageLevel.MEMORY_AND_DISK_SER)
테스트 를 통 해 모든 유 니 온 의 임시 시 계 는 매우 빨리 달리 고 가장 느 린 것 은 40min 정도 에 달 렸 다. 원래 아주 큰 sql 시 계 는 네 번 스 캔 한 다음 에 결 과 를 유 니 온 으로 한 곳 에 달 려 야 한다. 이 sql 의 실행 속 도 는 매우 느리다. 원래 1.5 시간 이면 어디 에 문제 가 생 겼 는 지 가장 들 어가 서 4 - 6 시간 을 달 렸 다.나 는 이 몇 개의 유 니 온 의 작은 sql 을 단독으로 달 렸 는데 가장 느 린 것 은 40min 에 달 렸 다 는 것 을 알 게 되 었 다. 이 몇 개의 sql 은 달 리 는 데 시간 이 비교적 걸 리 기 때문에 나 는 매번 sql 을 달 릴 때마다 결 과 를 insert 를 네 개의 중간 표 에 넣 은 다음 에 4 장의 중간 표 유 니 온, 그리고 group by 를 이렇게 하면 시간 이 짧 고 만약 에 데이터 가 실패 하 더 라 도.이 네 개의 표 뒤에 발생 하 는 오류 만 있다 면 나 는 프로그램 을 바 꾸 어 네 장의 중간 표 에서 데 이 터 를 찾 게 한 후에 많은 시간 을 절약 할 수 있다. 데 이 터 를 보충 하 는 데 지금 은 5 분 밖 에 걸 리 지 않 는 다.
개 조 된 코드 는 다음 과 같 습 니 다.
// ,
//s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
val CurDayFinalDirPath1 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1/updatedate=$yesterDay"
FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
delete(new Path(CurDayFinalDirPath1), true)
ss.sql(
s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_1
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(
select app,day,hour,adx,os,osv,country,impType,count(1) as cnt,
0 as request,
0 as response,
0 as bid,
sum(case when filter='timeout' then 1 else 0 end) as timeout,
sum(case when filter='status' then 1 else 0 end) as status,
sum(case when filter='country' then 1 else 0 end) as ccount,
sum(case when filter='remarketing' then 1 else 0 end) as remarketing,
sum(case when filter='banner' then 1 else 0 end) as banner,
sum(case when filter='video' then 1 else 0 end) as video,
sum(case when filter='mediatype' then 1 else 0 end) as mediatype,
sum(case when filter='publisher' then 1 else 0 end) as publisher,
sum(case when filter='ifa' then 1 else 0 end) as ifa,
sum(case when filter='qps' then 1 else 0 end) as qps,
sum(case when filter='budget' then 1 else 0 end) as budget,
sum(case when filter='cat' then 1 else 0 end) as cat,
sum(case when filter='error' then 1 else 0 end) as error,
sum(case when filter='bidfloor' then 1 else 0 end) as bidfloor,
sum(case when filter='seatbid' then 1 else 0 end) as seatbid,
sum(case when filter='admerror' then 1 else 0 end) as admerror
from
(
select rr.app,day,hour,adx,os,osv,country,impType,rr.filter
from (
select day,hour,adx,os,osv,country,impType,split(regexp_replace(regexp_extract(allapps,'^\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s') pp
lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'app','filter') rr as app, filter
) t where app<>'NULL' and app<>'' group by app,day,hour,adx,os,osv,country,impType
) t where app<>'' group by app,day,hour,adx,os,osv,country,impType
""".format(yesterDay,yesterDay))
//ssssssssssssss22222222222222222
// ,
//s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
val CurDayFinalDirPath2 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_2/updatedate=$yesterDay"
FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
delete(new Path(CurDayFinalDirPath2), true)
ss.sql(s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_2
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(
select app,day,hour,adx,os,osv,country,impType, 0 as cnt, count(1) as request, 0 as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
lateral view explode(split(regexp_replace(appsitemfinal,'\\\\[|\\\\]|"',''), ',')) aa as app
where day='%s' and app <>'NULL' and app is not null and app <> ''
group by app,day,hour,adx,os,osv,country,impType
) t where app<>'' group by app,day,hour,adx,os,osv,country,impType
""".format(yesterDay,yesterDay))
// //ssssssssssssss3333333333333333333333
// ,
//s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
val CurDayFinalDirPath3 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_3/updatedate=$yesterDay"
FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
delete(new Path(CurDayFinalDirPath3), true)
ss.sql(s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_3
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(
select appName as app,day,hour,adx,os,osv,country,impType,0 as cnt, 0 as request, sum(case when httpStatus='200' then 1 else 0 end) as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
(
select rr.appName,day,hour,adx,os,osv,country,impType, rr.httpStatus
from (
select day,hour,adx,os,osv,country,impType, split(regexp_replace(regexp_extract(appitemresponse,'\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s'
) pp lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'appName','httpStatus') rr as appName, httpStatus
) t where appName<>'NULL' group by appName,day,hour,adx,os,osv,country,impType
) t where app<>'' group by app,day,hour,adx,os,osv,country,impType
""".format(yesterDay,yesterDay))
//ssssssssssssss44444444444444444444
// ,
//s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
val CurDayFinalDirPath4 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_4/updatedate=$yesterDay"
FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
delete(new Path(CurDayFinalDirPath4), true)
ss.sql( s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_4
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(
select appname as app,day,hour,adx,os,osv,country,impType, 0 as cnt ,0 as request, 0 as response, count(1) as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s' and trim(appname)<>''
group by appname,day,hour,adx,os,osv,country,impType
) t where app<>'' group by app,day,hour,adx,os,osv,country,impType
""".format(yesterDay,yesterDay))
val bidDetailDf= ss.sql(
s"""
|select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
|,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
|sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
|from
|(
|select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_1 where updatedate='$yesterDay'
|union all
|select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_2 where updatedate='$yesterDay'
|union all
|select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_3 where updatedate='$yesterDay'
|union all
|select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_4 where updatedate='$yesterDay'
|)group by app,day,hour,adx,os,osv,country,impType
""".stripMargin)
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.