Django rest framework simple JWT의 소스 분석
from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
urlpatterns =[ \[](https://blog.csdn.net/cpxsxn/article/details/104613129)
url('obtain/', TokenObtainPairView.as_view(), name='token_obtain_pair'), ---------------------------step1
]
그래서 본고는 이 URL 뒤에 있는 원본 프로세스를 중점적으로 설명하고 아래의 호출 절차를 구체적으로 참고한다.
결론: RefreshToken 실례의payload(dict)에는 token 이 함유되어 있다type/exp/jti/user_id. 인증 클래스 결합restframework_simplejwt.authentication.JWTAuthentication의 authenticate 방법의 실현은 JWT의 인증 본질을 거시적으로 이해할 수 있다. 당신에게 token 문자열을 주고 그 안에user 가 숨어 있다.id와 만료 시간, 그리고 사용자가 이 Token 요청을 가지고 있을 때 Token을 복호화하기만 하면 (복호화 성공 설명이 합법적인 Token) json 형식에 맞는 문자열을 얻을 수 있습니다(JWT가 도대체 어떤 문자열이고 그 안에 어떤 정보가 포함되어 있는지 참조)userid가auth사용자표를 비교하면 현재 요청이 누가 보냈는지 알 수 있습니다.expiretime를 꺼내서 현재 시간과 결합하면 기한이 지났는지 알 수 있습니다. 디자인이 정말 좋아요!!!이렇게 하는 의미는 비민감정보userid는user/password 등 민감한 정보를 대체하여 요청을 보낸 사용자가 누구인지 나타낸다.물론 백엔드에서user아이디 인증했어요.사실 어떤 인증 방식이든 해결해야 할 문제는 요청을 한 사람이 누구인지, 보고 싶은 데이터를 받을 권한이 있는지입니다.
확장: Token의payload를 사용자 정의할 수 있습니까? (이 영감은 xdbamp 백엔드 개조 프로젝트에서 나온 것), 예를 들어 사용자의 부서 정보를payload에 추가하고 싶다면 답은 당연히 긍정적입니다. 사용자 정의 JWT에 포함된 정보를 참고하세요. 완벽합니다!
D:\WorkSpace\Archiver\archiver_gitcode\venv\Lib\site-packages\rest_framework_simplejwt\views.py
class TokenViewBase(generics.GenericAPIView):
permission_classes = ()
authentication_classes = ()
serializer_class = None
www_authenticate_realm = 'api'
def get_authenticate_header(self, request):
return '{0} realm="{1}"'.format(
AUTH_HEADER_TYPES[0],
self.www_authenticate_realm,
)
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data) ------------------------------step2
try:
serializer.is_valid(raise_exception=True)
except TokenError as e:
raise InvalidToken(e.args[0])
return Response(serializer.validated_data, status=status.HTTP_200_OK)
class TokenObtainPairView(TokenViewBase):
"""
Takes a set of user credentials and returns an access and refresh JSON web
token pair to prove the authentication of those credentials.
"""
serializer_class = serializers.TokenObtainPairSerializer
D:\WorkSpace\Archiver\archiver_gitcode\venv\Lib\site-packages\rest_framework_simplejwt\serializers.py
from .tokens import RefreshToken, SlidingToken, UntypedToken
class TokenObtainPairSerializer(TokenObtainSerializer):
@classmethod
def get_token(cls, user):
return RefreshToken.for_user(user) -------------------------------------step4
def validate(self, attrs):
data = super(TokenObtainPairSerializer, self).validate(attrs)
refresh = self.get_token(self.user) ------------------------------------step3
data['refresh'] = text_type(refresh)
data['access'] = text_type(refresh.access_token) ----------------------------------step7
return data
D:\WorkSpace\Archiver\archiver_gitcode\venv\Lib\site-packages\rest_framework_simplejwt\tokens.py
class RefreshToken(BlacklistMixin, Token):
token_type = 'refresh'
lifetime = api_settings.REFRESH_TOKEN_LIFETIME
no_copy_claims = (api_settings.TOKEN_TYPE_CLAIM, 'exp', 'jti')
@property
def access_token(self):
"""
Returns an access token created from this refresh token. Copies all
claims present in this refresh token to the new access token except
those claims listed in the `no_copy_claims` attribute.
"""
access = AccessToken() -----------------------------------------------------step8: AccessToken
# Use instantiation time of refresh token as relative timestamp for
# access token "exp" claim. This ensures that both a refresh and
# access token expire relative to the same time if they are created as
# a pair.
access.set_exp(from_time=self.current_time)
no_copy = self.no_copy_claims
for claim, value in self.payload.items(): -----------------------------------------step9: step6_x RefreshToken payload token_type/exp/jti/user_id
if claim in no_copy: RefreshToken token_type/exp/jti user_id
continue
access[claim] = value
return access
class AccessToken(Token):
token_type = 'access'
lifetime = api_settings.ACCESS_TOKEN_LIFETIME
class BlacklistMixin(object):
"""
If the `rest_framework_simplejwt.token_blacklist` app was configured to be
used, tokens created from `BlacklistMixin` subclasses will insert
themselves into an outstanding token list and also check for their
membership in a token blacklist.
"""
if 'rest_framework_simplejwt.token_blacklist' in settings.INSTALLED_APPS:
def verify(self, *args, **kwargs):
self.check_blacklist()
super(BlacklistMixin, self).verify(*args, **kwargs)
def check_blacklist(self):
"""
Checks if this token is present in the token blacklist. Raises
`TokenError` if so.
"""
jti = self.payload['jti']
if BlacklistedToken.objects.filter(token__jti=jti).exists():
raise TokenError(_('Token is blacklisted'))
def blacklist(self):
"""
Ensures this token is included in the outstanding token list and
adds it to the blacklist.
"""
jti = self.payload['jti']
exp = self.payload['exp']
# Ensure outstanding token exists with given jti
token, _ = OutstandingToken.objects.get_or_create(
jti=jti,
defaults={
'token': str(self),
'expires_at': datetime_from_epoch(exp),
},
)
return BlacklistedToken.objects.get_or_create(token=token)
@classmethod
def for_user(cls, user): -----------------------------------------------------step5_1
"""
Adds this token to the outstanding token list.
"""
token = super(BlacklistMixin, cls).for_user(user)
jti = token['jti']
exp = token['exp']
OutstandingToken.objects.create(
user=user,
jti=jti,
token=str(token),
created_at=token.current_time,
expires_at=datetime_from_epoch(exp),
)
return token
@python_2_unicode_compatible
class Token(object):
"""
A class which validates and wraps an existing JWT or can be used to build a
new JWT.
"""
token_type = None
lifetime = None
def __init__(self, token=None, verify=True):
"""
!!!! IMPORTANT !!!! MUST raise a TokenError with a user-facing error
message if the given token is invalid, expired, or otherwise not safe
to use.
"""
if self.token_type is None or self.lifetime is None:
raise TokenError(_('Cannot create token with no type or lifetime'))
self.token = token
self.current_time = aware_utcnow()
# Set up token
if token is not None:
# An encoded token was provided
from .state import token_backend
# Decode token
try:
self.payload = token_backend.decode(token, verify=verify)
except TokenBackendError:
raise TokenError(_('Token is invalid or expired'))
if verify:
self.verify()
else:
# New token. Skip all the verification steps.
self.payload = {api_settings.TOKEN_TYPE_CLAIM: self.token_type} ------------step6_2 token_type RefreshToken payload
# Set "exp" claim with default value
self.set_exp(from_time=self.current_time, lifetime=self.lifetime) ------------step6_3 exp RefreshToken payload
# Set "jti" claim
self.set_jti() ------------step6_4 jti(jwt-token-id) RefreshToken payload
def __repr__(self):
return repr(self.payload)
def __getitem__(self, key):
return self.payload[key]
def __setitem__(self, key, value):
self.payload[key] = value
def __delitem__(self, key):
del self.payload[key]
def __contains__(self, key):
return key in self.payload
def get(self, key, default=None):
return self.payload.get(key, default)
def __str__(self):
"""
Signs and returns a token as a base64 encoded string.
"""
from .state import token_backend
return token_backend.encode(self.payload)
def verify(self):
"""
Performs additional validation steps which were not performed when this
token was decoded. This method is part of the "public" API to indicate
the intention that it may be overridden in subclasses.
"""
# According to RFC 7519, the "exp" claim is OPTIONAL
# (https://tools.ietf.org/html/rfc7519#section-4.1.4). As a more
# correct behavior for authorization tokens, we require an "exp"
# claim. We don't want any zombie tokens walking around.
self.check_exp()
# Ensure token id is present
if 'jti' not in self.payload:
raise TokenError(_('Token has no id'))
self.verify_token_type()
def verify_token_type(self):
"""
Ensures that the token type claim is present and has the correct value.
"""
try:
token_type = self.payload[api_settings.TOKEN_TYPE_CLAIM]
except KeyError:
raise TokenError(_('Token has no type'))
if self.token_type != token_type:
raise TokenError(_('Token has wrong type'))
def set_jti(self):
"""
Populates the "jti" claim of a token with a string where there is a
negligible probability that the same string will be chosen at a
later time.
See here:
https://tools.ietf.org/html/rfc7519#section-4.1.7
"""
self.payload['jti'] = uuid4().hex
def set_exp(self, claim='exp', from_time=None, lifetime=None):
"""
Updates the expiration time of a token.
"""
if from_time is None:
from_time = self.current_time
if lifetime is None:
lifetime = self.lifetime
self.payload[claim] = datetime_to_epoch(from_time + lifetime)
def check_exp(self, claim='exp', current_time=None):
"""
Checks whether a timestamp value in the given claim has passed (since
the given datetime value in `current_time`). Raises a TokenError with
a user-facing error message if so.
"""
if current_time is None:
current_time = self.current_time
try:
claim_value = self.payload[claim]
except KeyError:
raise TokenError(format_lazy(_("Token has no '{}' claim"), claim))
claim_time = datetime_from_epoch(claim_value)
if claim_time <= current_time:
raise TokenError(format_lazy(_("Token '{}' claim has expired"), claim))
@classmethod
def for_user(cls, user): --------------------------------------------------------------step5_2
"""
Returns an authorization token for the given user that will be provided
after authenticating the user's credentials.
"""
user_id = getattr(user, api_settings.USER_ID_FIELD)
if not isinstance(user_id, int):
user_id = text_type(user_id)
token = cls() ---------------------------------------------------------------step6_1 RefreshToken
token[api_settings.USER_ID_CLAIM] = user_id -------------------------------------step6_5 user_id RefreshToken payload
RefreshToken Token __setitem__ ,
return token RefreshToken
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Django 라우팅 계층 URLconf 작용 및 원리 해석URL 구성(URLconf)은 Django가 지원하는 웹 사이트의 디렉토리와 같습니다.그것의 본질은 URL과 이 URL을 호출할 보기 함수 사이의 맵표입니다. 위의 예제에서는 URL의 값을 캡처하고 위치 매개 변수로...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.