파이톤3.0에서 웹 요청 처리 - wsgi 계속 봉인
이번에는 Cookies 봉인,session 지원, 루틴 역할 영역에서 Request,response 등을 가져왔습니다.아직 세션이 지속이 안 돼요.
# -*- coding: utf-8 -*-
import socketserver, re, cgi, io, urllib.parse
from wsgiref.simple_server import WSGIServer
import threading, time, urllib, guid
from http.cookies import SimpleCookie
ctx = context = threading.local()
class AppException(Exception):
pass
class SessionPool(object):
sessionIdKey = "psessionid"
""" Session """
def __init__(self, session_store_time=30):
""" Session
@param session_store_time:session , :
"""
self.session_store_time = session_store_time
self.sessions = {}
def getSession(self, key):
""" Session"""
if(key in self.sessions):
session = self.sessions[key]
if(session.isTimeOut()):
self.removeSession(session.sessionId)
else:
return session
return None
def createSession(self):
""" Session"""
sessionId = self.newSessionId()
session = Session(sessionId, self.session_store_time)
self.sessions[sessionId] = session
return session
def removeSession(self, key):
""" Session"""
#self.sessions.remove(key)
if(key in self.sessions):
del self.sessions[key]
def newSessionId(self, ip=None):
""" SessionId"""
return guid.generate(ip)
def getSessionByCookie(self, cookie, response=None, create=True):
""" Cookie session"""
sessionId = cookie.get(SessionPool.sessionIdKey, None)
if(sessionId is not None):
sessionId = sessionId.value
session = self.getSession(sessionId)
if(session is not None):
session.lastAccessTime = time.time()
return session
if(create):
session = self.createSession()
response.putCookie(SessionPool.sessionIdKey, session.sessionId)
return session
return None
def saveSessions(self):
pass
class Session(dict):
""" """
def __init__(self, sid, store_time):
self.sessionId = sid
self.lastAccessTime = self.createTime = time.time()
self.maxInactiveInterval = store_time # session , :
def isTimeOut(self):
""" """
return time.time() - self.lastAccessTime > self.maxInactiveInterval * 60
class Request(object):
""" """
def __init__(self, env, sessions):
self.env = env
self.winput = env["wsgi.input"]
self.method = env["REQUEST_METHOD"] # (GET or POST)
self.__attrs = {}
self.attributes = {}
self.encoding = "UTF-8"
self.cookies = SimpleCookie(env.get("HTTP_COOKIE",""))
self.response = ctx.response
self.sessionPool = sessions
def __getattr__(self, attr):
if(attr == "params" and "params" not in self.__attrs): #
fp = None
if(self.method == "POST"): # POST , POST , GET
content = self.winput.read(int(self.env.get("CONTENT_LENGTH","0")))
#fp = io.StringIO(content.decode(self.encoding))
fp = io.StringIO(urllib.parse.unquote(content.decode("ISO-8859-1"),encoding=self.encoding))
self.fs = cgi.FieldStorage(fp = fp, environ=self.env, keep_blank_values=1)# FieldStorage
self.params = {}
for key in self.fs.keys():
self.params[key] = self.fs[key].value
self.__attrs["params"] = self.params
if(attr == "session" and "session" not in self.__attrs): # request session
self.session = self.sessionPool.getSessionByCookie(self.cookies, self.response)
return self.session
return self.__attrs[attr]
class Response(object):
""" """
def __init__(self, start_response, write = None):
self.encoding = "UTF-8"
self.start_response = start_response
self._write = write
self.cookies = None
self.headers = {}
def write(self, string):
"""
@param string:
"""
if(self._write is None):
__headers = [("Content-type","text/html;charset="+self.encoding)]
if(self.cookies is not None):
t = ('Set-Cookie', self.cookies.output(header=""))
__headers.append(t)
for k, v in self.headers.items():
t = (k,v)
__headers.append(t)
self._write = self.start_response("200 OK", __headers)
self._write(string.encode(self.encoding).decode("ISO-8859-1"))
def redirect(self, url):
""" """
if(self._write is not None):
raise AppException(" , 。")
self.start_response("302 OK", [("Location",url)])
def putCookie(self, key, value, expires=1000000, path='/'):
""" Cookie """
if(self.cookies is None):
self.cookies = SimpleCookie()
self.cookies[key] = urllib.parse.quote(value)
self.cookies[key]["expires"] = expires
self.cookies[key]['path'] = path
def addHeaders(key, value):
self.headers[key] = value
#WSGIServer …
class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer):
""" WSGI """
pass
class WSGIApplication(object):
"""WSGI """
def __init__(self, urls=None):
self.urls = urls # URL
self.sessions = SessionPool(1)
def getHandlerByUrl(self, url):
""" URL , None"""
url = url.replace("//","/") # url
urlArr = url.split('/')
for setUrl in self.urls.keys():
setUrlArr = setUrl.split("/")
#print(setUrl.replace("*",r'\w*'))
if(len(setUrlArr) == len(urlArr)):
for i in range(len(urlArr)):
if(i == len(urlArr) - 1 and
(setUrlArr[i] == '*' or setUrlArr[i] == urlArr[i] or
('*' in setUrlArr[i] and re.search(setUrlArr[i].replace("*",r'\w*'),urlArr[i])))):
return self.urls[setUrl]
if(setUrlArr[i] == '*' or setUrlArr[i]==' '):
continue;
if(setUrlArr[i] != urlArr[i]):
break;
def make_app(self):
""" WSGI """
def wsgi_app(env, start_response):
print("start request....")
#print(";
".join([k+"="+str(v) for k, v in env.items()]))
url = env["PATH_INFO"] # URL
handlerCls = self.getHandlerByUrl(url)
if(handlerCls is None):
# url
start_response("500 OK", [("Content-type","text/html;charset=utf-8")])
return "Error URL"
if(not hasattr(handlerCls,"doGET") and not hasattr(handlerCls,"doPOST")):
#
start_response("500 OK", [("Content-type","text/html;charset=utf-8")])
return "Error Mapping"
response = Response(start_response)
ctx.response = response
request = Request(env, self.sessions)
ctx.request = request # request response ,
try:
handler = handlerCls(request, response)
except TypeError as e:
handler = handlerCls()
methodName = "do" + request.method
returnValue = None
try:
returnValue = getattr(handler,methodName)(request, response)
except TypeError as e:
returnValue = getattr(handler,methodName)()
if(returnValue is None):
returnValue=[]
print("end request....")
return returnValue
return wsgi_app
def make_server(self, serverIp='', port=8080, test=False):
"""
@param test:
"""
from wsgiref.simple_server import make_server #
httpd = make_server(serverIp, port, self.make_app(), server_class=ThreadingWSGIServer)
if test: #
httpd.handle_request() #
else:
httpd.serve_forever() #
return True
def main():
app = WSGIApplication(urls={"/a/*":TestHandler, "/a/b/*.do":TestHandler})
app.make_server(test=False,port=9000)
class TestHandler(object):
def __init__(self):
pass
def doGET(self):
ctx.request.encoding='UTF-8'
session = ctx.request.session
if("x" in ctx.request.params):
session["x"] = ctx.request.params["x"]
#time.sleep(3)
ctx.response.write("Hello "+session["x"])
def doPOST(self):
#request.encoding='UTF-8'
#response.write(request.params["name"])
ctx.response.redirect("/a/x")
if __name__=="__main__":
main()
#input()
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Java 다중 스레드를 순차적으로 실행하는 몇 가지 방법 요약Java 다중 스레드를 순차적으로 실행하는 몇 가지 방법 요약 동료는 무심결에 이 문제를 제기하고 두 가지 방법을 직접 실천했다.물론 더 좋은 방법이 있을 거야. 방법 1 이런 방법은 비교적 흔히 볼 수 있는 해결 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.