sparksql 최적화, 성능 3 - 4 배 향상

15855 단어 spark
요 며칠 동안 코드 가 매우 느리게 달 렸 습 니 다. 큰 sql 은 4 - 6h 를 달 렸 습 니 다. 정말 급 합 니 다. 큰 sql 은 다음 과 같 습 니 다.
val bidDetailDf=ss.sql(
		s"""
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(

select app,day,hour,adx,os,osv,country,impType,count(1) as cnt,
0 as request,
0 as response,
0 as bid,
sum(case when filter='timeout' then 1 else 0 end) as timeout,
sum(case when filter='status' then 1 else 0 end) as status,
sum(case when filter='country' then 1 else 0 end) as ccount,
sum(case when filter='remarketing' then 1 else 0 end) as remarketing,
sum(case when filter='banner' then 1 else 0 end) as banner,
sum(case when filter='video' then 1 else 0 end) as video,
sum(case when filter='mediatype' then 1 else 0 end) as mediatype,
sum(case when filter='publisher' then 1 else 0 end) as publisher,
sum(case when filter='ifa' then 1 else 0 end) as ifa,
sum(case when filter='qps' then 1 else 0 end) as qps,
sum(case when filter='budget' then 1 else 0 end) as budget,
sum(case when filter='cat' then 1 else 0 end) as cat,
sum(case when filter='error' then 1 else 0 end) as error,
sum(case when filter='bidfloor' then 1 else 0 end) as bidfloor,
sum(case when filter='seatbid' then 1 else 0 end) as seatbid,
sum(case when filter='admerror' then 1 else 0 end) as admerror
from
(
select rr.app,day,hour,adx,os,osv,country,impType,rr.filter
from (
select day,hour,adx,os,osv,country,impType,split(regexp_replace(regexp_extract(allapps,'^\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s') pp
lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'app','filter') rr as app, filter
) t where app<>'NULL' group by app,day,hour,adx,os,osv,country,impType

union all

select app,day,hour,adx,os,osv,country,impType, 0 as cnt, count(1) as request, 0 as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
lateral view explode(split(regexp_replace(appsitemfinal,'\\\\[|\\\\]|"',''), ',')) aa as app
where day='%s' and app <>'NULL' and app is not null and app <> ''
group by app,day,hour,adx,os,osv,country,impType
union all

select appName as app,day,hour,adx,os,osv,country,impType,0 as cnt, 0 as request,  sum(case when httpStatus='200' then 1 else 0 end) as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
(
select rr.appName,day,hour,adx,os,osv,country,impType, rr.httpStatus
from (
select day,hour,adx,os,osv,country,impType, split(regexp_replace(regexp_extract(appitemresponse,'\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s'
) pp lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'appName','httpStatus') rr as appName, httpStatus
) t   where appName<>'NULL' group by appName,day,hour,adx,os,osv,country,impType
union all

select appname as app,day,hour,adx,os,osv,country,impType, 0 as cnt ,0 as request, 0 as response, count(1) as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s' and trim(appname)<>''
group by appname,day,hour,adx,os,osv,country,impType
) t  where app<>'' group by app,day,hour,adx,os,osv,country,impType
  """.format(yesterDay,yesterDay,yesterDay,yesterDay)).persist(StorageLevel.MEMORY_AND_DISK_SER)

테스트 를 통 해 모든 유 니 온 의 임시 시 계 는 매우 빨리 달리 고 가장 느 린 것 은 40min 정도 에 달 렸 다. 원래 아주 큰 sql 시 계 는 네 번 스 캔 한 다음 에 결 과 를 유 니 온 으로 한 곳 에 달 려 야 한다. 이 sql 의 실행 속 도 는 매우 느리다. 원래 1.5 시간 이면 어디 에 문제 가 생 겼 는 지 가장 들 어가 서 4 - 6 시간 을 달 렸 다.나 는 이 몇 개의 유 니 온 의 작은 sql 을 단독으로 달 렸 는데 가장 느 린 것 은 40min 에 달 렸 다 는 것 을 알 게 되 었 다. 이 몇 개의 sql 은 달 리 는 데 시간 이 비교적 걸 리 기 때문에 나 는 매번 sql 을 달 릴 때마다 결 과 를 insert 를 네 개의 중간 표 에 넣 은 다음 에 4 장의 중간 표 유 니 온, 그리고 group by 를 이렇게 하면 시간 이 짧 고 만약 에 데이터 가 실패 하 더 라 도.이 네 개의 표 뒤에 발생 하 는 오류 만 있다 면 나 는 프로그램 을 바 꾸 어 네 장의 중간 표 에서 데 이 터 를 찾 게 한 후에 많은 시간 을 절약 할 수 있다. 데 이 터 를 보충 하 는 데 지금 은 5 분 밖 에 걸 리 지 않 는 다.
개 조 된 코드 는 다음 과 같 습 니 다.
 //       ,      
    //s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
    val CurDayFinalDirPath1 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1/updatedate=$yesterDay"
    FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
      delete(new Path(CurDayFinalDirPath1), true)
ss.sql(
		s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_1
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(

select app,day,hour,adx,os,osv,country,impType,count(1) as cnt,
0 as request,
0 as response,
0 as bid,
sum(case when filter='timeout' then 1 else 0 end) as timeout,
sum(case when filter='status' then 1 else 0 end) as status,
sum(case when filter='country' then 1 else 0 end) as ccount,
sum(case when filter='remarketing' then 1 else 0 end) as remarketing,
sum(case when filter='banner' then 1 else 0 end) as banner,
sum(case when filter='video' then 1 else 0 end) as video,
sum(case when filter='mediatype' then 1 else 0 end) as mediatype,
sum(case when filter='publisher' then 1 else 0 end) as publisher,
sum(case when filter='ifa' then 1 else 0 end) as ifa,
sum(case when filter='qps' then 1 else 0 end) as qps,
sum(case when filter='budget' then 1 else 0 end) as budget,
sum(case when filter='cat' then 1 else 0 end) as cat,
sum(case when filter='error' then 1 else 0 end) as error,
sum(case when filter='bidfloor' then 1 else 0 end) as bidfloor,
sum(case when filter='seatbid' then 1 else 0 end) as seatbid,
sum(case when filter='admerror' then 1 else 0 end) as admerror
from
(
select rr.app,day,hour,adx,os,osv,country,impType,rr.filter
from (
select day,hour,adx,os,osv,country,impType,split(regexp_replace(regexp_extract(allapps,'^\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s') pp
lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'app','filter') rr as app, filter
) t where app<>'NULL' and app<>'' group by app,day,hour,adx,os,osv,country,impType

) t  where app<>'' group by app,day,hour,adx,os,osv,country,impType
  """.format(yesterDay,yesterDay))

    //ssssssssssssss22222222222222222
    //       ,      
    //s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
    val CurDayFinalDirPath2 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_2/updatedate=$yesterDay"
    FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
      delete(new Path(CurDayFinalDirPath2), true)
ss.sql(s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_2
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(



select app,day,hour,adx,os,osv,country,impType, 0 as cnt, count(1) as request, 0 as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
lateral view explode(split(regexp_replace(appsitemfinal,'\\\\[|\\\\]|"',''), ',')) aa as app
where day='%s' and app <>'NULL' and app is not null and app <> ''
group by app,day,hour,adx,os,osv,country,impType
) t  where app<>'' group by app,day,hour,adx,os,osv,country,impType
  """.format(yesterDay,yesterDay))

//    //ssssssssssssss3333333333333333333333
//       ,      
//s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
val CurDayFinalDirPath3 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_3/updatedate=$yesterDay"
    FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
      delete(new Path(CurDayFinalDirPath3), true)

ss.sql(s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_3
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(
select appName as app,day,hour,adx,os,osv,country,impType,0 as cnt, 0 as request,  sum(case when httpStatus='200' then 1 else 0 end) as response, 0 as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
(
select rr.appName,day,hour,adx,os,osv,country,impType, rr.httpStatus
from (
select day,hour,adx,os,osv,country,impType, split(regexp_replace(regexp_extract(appitemresponse,'\\\\[(.+)\\\\]',1),'\\\\}\\\\,\\\\{', '\\\\}\\\\|\\\\|\\\\{'),'\\\\|\\\\|') as str
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s'
) pp lateral view explode(pp.str) ss as col
lateral view json_tuple(ss.col,'appName','httpStatus') rr as appName, httpStatus
) t   where appName<>'NULL' group by appName,day,hour,adx,os,osv,country,impType
) t  where app<>'' group by app,day,hour,adx,os,osv,country,impType
  """.format(yesterDay,yesterDay))

    //ssssssssssssss44444444444444444444

    //       ,      
    //s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_1
    val CurDayFinalDirPath4 = s"s3://baidu.taobao.tenent.com/hive_dataware/mediabuy_dsp/t_dsp_bid_middle_detail_tbl_4/updatedate=$yesterDay"
    FileSystem.get(new URI("s3://baidu.taobao.tenent.com"), ss.sparkContext.hadoopConfiguration).
      delete(new Path(CurDayFinalDirPath4), true)
ss.sql( s"""
INSERT OVERWRITE TABLE mediabuy_dsp.t_dsp_bid_middle_detail_tbl_4
PARTITION (updatedate = '%s')
select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
from
(

select appname as app,day,hour,adx,os,osv,country,impType, 0 as cnt ,0 as request, 0 as response, count(1) as bid, 0 as timeout,
0 as status,0 as ccount,0 as remarketing,0 as banner,0 as video,0 as mediatype,0 as publisher,0 as ifa,0 as qps,0 as budget,
0 as cat,0 as error,0 as bidfloor,0 as seatbid,0 as admerror
from
mediabuy_dsp.t_dsp_bid_detail_tbl
where day='%s' and trim(appname)<>''
group by appname,day,hour,adx,os,osv,country,impType
) t  where app<>'' group by app,day,hour,adx,os,osv,country,impType
  """.format(yesterDay,yesterDay))

val bidDetailDf= ss.sql(
  s"""
     |select app,day,hour,adx,os,osv,country,impType, sum(cnt) as cnt,sum(request) as request, sum(response) as response, sum(bid) as bid, sum(timeout) as timeout
     |,sum(status) as status,sum(ccount) as ccount,sum(remarketing) as remarketing,sum(banner) as banner,sum(video) as video,sum(mediatype) as mediatype,sum(publisher) as publisher,sum(ifa) as ifa,sum(qps) as qps,sum(budget) as budget,
     |sum(cat) as cat,sum(error) as error,sum(bidfloor) as bidfloor,sum(seatbid) as seatbid,sum(admerror) as admerror
     |from
     |(
     |select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_1 where updatedate='$yesterDay'
     |union all
     |select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_2 where updatedate='$yesterDay'
     |union all
     |select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_3 where updatedate='$yesterDay'
     |union all
     |select app,day,hour,adx,os,osv,country,impType,cnt,request,response,bid,timeout,status,ccount,remarketing,banner,video,mediatype,publisher,ifa,qps,budget,cat,error,bidfloor,seatbid,admerror from mediabuy_dsp.t_dsp_bid_middle_detail_tbl_4 where updatedate='$yesterDay'
     |)group by app,day,hour,adx,os,osv,country,impType
   """.stripMargin)


좋은 웹페이지 즐겨찾기