asyncio + pycurl + BytesIO 비동기 일괄 호출 url 요청

3833 단어 asynciopycurlBytesIO
import asyncioimport pycurlfrom io import BytesIOimport json
def fetch_api (url, method, header = None, data = None): "url: api 를 가 져 오 는 urlmethod: 요청 방법 header: 요청 헤더 data: 요청 매개 변수"
if method == "get":
    _Curl = pycurl.Curl()
    buf = BytesIO()
    _Curl.setopt(pycurl.WRITEFUNCTION,buf.write)
    _Curl.setopt(pycurl.URL, url)
    _Curl.setopt(pycurl.HTTPHEADER, header)
    _Curl.perform()
    result = buf.getvalue().decode("utf-8")
    http_code = _Curl.getinfo(_Curl.HTTP_CODE)
    dns_time = _Curl.getinfo(_Curl.NAMELOOKUP_TIME) * 1000 # DNS    
    connect_time = _Curl.getinfo(_Curl.CONNECT_TIME) * 1000 #     
    pretransafe_time = _Curl.getinfo(_Curl.PRETRANSFER_TIME) * 1000 #              
    starttransafe_time =_Curl.getinfo(_Curl.STARTTRANSFER_TIME) * 1000 #            
    redirect_time = _Curl.getinfo(_Curl.REDIRECT_TIME) * 1000 #      
    total_time = _Curl.getinfo(_Curl.TOTAL_TIME) * 1000 #      
    download_size = _Curl.getinfo(_Curl.SIZE_DOWNLOAD) #        
    download_speed = _Curl.getinfo(_Curl.SPEED_DOWNLOAD) #     
    buf.close()
    print(result)
    print(http_code,dns_time,connect_time,pretransafe_time,starttransafe_time,redirect_time,total_time,download_size,download_speed)
    _Curl.close()

else:
    _Curl = pycurl.Curl()
    _Curl.setopt(pycurl.HTTPHEADER, header)

    buf = BytesIO()
    _Curl.setopt(pycurl.WRITEFUNCTION, buf.write)
    _Curl.setopt(pycurl.URL, url)
    _Curl.setopt(_Curl.POSTFIELDS, data)

    _Curl.perform()
    result = buf.getvalue().decode("utf-8")
    http_code = _Curl.getinfo(_Curl.HTTP_CODE)
    dns_time = _Curl.getinfo(_Curl.NAMELOOKUP_TIME) * 1000  # DNS    
    connect_time = _Curl.getinfo(_Curl.CONNECT_TIME) * 1000  #     
    pretransafe_time = _Curl.getinfo(_Curl.PRETRANSFER_TIME) * 1000  #              
    starttransafe_time = _Curl.getinfo(_Curl.STARTTRANSFER_TIME) * 1000  #            
    redirect_time = _Curl.getinfo(_Curl.REDIRECT_TIME) * 1000  #      
    total_time = _Curl.getinfo(_Curl.TOTAL_TIME) * 1000 #         
    download_size = _Curl.getinfo(_Curl.SIZE_DOWNLOAD)  #        
    download_speed = _Curl.getinfo(_Curl.SPEED_DOWNLOAD)  #     
    buf.close()
    print(result)
    print(http_code, dns_time, connect_time, pretransafe_time, starttransafe_time, redirect_time, total_time,
          download_size, download_speed)
    _Curl.close()

"" get 요청 테스트 헤더 = ['Content - Type: application / json; charset = utf - 8']
fetch_api('http://xxxx', 'get', header)
post 테스트 헤더 요청 = [
'Content-Type:application/json; charset=utf-8'

]
data = {"userid":"%s","nickname":"zxpp","sex":"1","birthday":"2015-01-01"}json_data = json.dumps(data)fetch_api('http://xxxx', 'post', header=header,data=json_data)
"""
@asyncio.coroutinedef fetch_async(func, url, method, header=None, data=None):loop = asyncio.get_event_loop()future = loop.run_in_executor (None, func, url, method, header, data) \ # response = yield from future \ # func 함수 가 데 이 터 를 되 돌려 주지 않 았 기 때문에 나중에 값 yield 를 되 돌려 주지 않 아 도 됩 니 다.
header = [
'Content-Type:application/json; charset=utf-8'

]
data = {"userid":"%s","nickname":"zxpp","sex":"1","birthday":"2015-01-01"}json_data = json.dumps(data)
tasks = [fetch_async(fetch_api, url='http://xxxx', method='get',header=header),fetch_async(fetch_api, url='http://xxxx', method='post', header=header,data=json_data)]
loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

좋은 웹페이지 즐겨찾기