Django rest framework simple JWT의 소스 분석

11727 단어 Django#auth
이 문서는 Django rest framework simple JWT가 Token을 가져오는 원본 프로세스를 다룹니다.프로젝트 프로젝트에rest 를 도입하는 것을 알고 있습니다framework_단순 jwt 패키지 후 url에서만 가능합니다.py중
from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
urlpatterns =[ \[](https://blog.csdn.net/cpxsxn/article/details/104613129)
	url('obtain/', TokenObtainPairView.as_view(), name='token_obtain_pair'), ---------------------------step1
]

그래서 본고는 이 URL 뒤에 있는 원본 프로세스를 중점적으로 설명하고 아래의 호출 절차를 구체적으로 참고한다.
결론: RefreshToken 실례의payload(dict)에는 token 이 함유되어 있다type/exp/jti/user_id. 인증 클래스 결합restframework_simplejwt.authentication.JWTAuthentication의 authenticate 방법의 실현은 JWT의 인증 본질을 거시적으로 이해할 수 있다. 당신에게 token 문자열을 주고 그 안에user 가 숨어 있다.id와 만료 시간, 그리고 사용자가 이 Token 요청을 가지고 있을 때 Token을 복호화하기만 하면 (복호화 성공 설명이 합법적인 Token) json 형식에 맞는 문자열을 얻을 수 있습니다(JWT가 도대체 어떤 문자열이고 그 안에 어떤 정보가 포함되어 있는지 참조)userid가auth사용자표를 비교하면 현재 요청이 누가 보냈는지 알 수 있습니다.expiretime를 꺼내서 현재 시간과 결합하면 기한이 지났는지 알 수 있습니다. 디자인이 정말 좋아요!!!이렇게 하는 의미는 비민감정보userid는user/password 등 민감한 정보를 대체하여 요청을 보낸 사용자가 누구인지 나타낸다.물론 백엔드에서user아이디 인증했어요.사실 어떤 인증 방식이든 해결해야 할 문제는 요청을 한 사람이 누구인지, 보고 싶은 데이터를 받을 권한이 있는지입니다.
확장: Token의payload를 사용자 정의할 수 있습니까? (이 영감은 xdbamp 백엔드 개조 프로젝트에서 나온 것), 예를 들어 사용자의 부서 정보를payload에 추가하고 싶다면 답은 당연히 긍정적입니다. 사용자 정의 JWT에 포함된 정보를 참고하세요. 완벽합니다!
D:\WorkSpace\Archiver\archiver_gitcode\venv\Lib\site-packages\rest_framework_simplejwt\views.py
class TokenViewBase(generics.GenericAPIView):
	permission_classes = ()
	authentication_classes = ()

	serializer_class = None

	www_authenticate_realm = 'api'

	def get_authenticate_header(self, request):
		return '{0} realm="{1}"'.format(
			AUTH_HEADER_TYPES[0],
			self.www_authenticate_realm,
		)

	def post(self, request, *args, **kwargs):
		serializer = self.get_serializer(data=request.data)    ------------------------------step2

		try:
			serializer.is_valid(raise_exception=True)
		except TokenError as e:
			raise InvalidToken(e.args[0])

		return Response(serializer.validated_data, status=status.HTTP_200_OK)
		
class TokenObtainPairView(TokenViewBase):
	"""
	Takes a set of user credentials and returns an access and refresh JSON web
	token pair to prove the authentication of those credentials.
	"""
	serializer_class = serializers.TokenObtainPairSerializer
	

			
		
D:\WorkSpace\Archiver\archiver_gitcode\venv\Lib\site-packages\rest_framework_simplejwt\serializers.py
from .tokens import RefreshToken, SlidingToken, UntypedToken
class TokenObtainPairSerializer(TokenObtainSerializer):
	@classmethod
	def get_token(cls, user):
		return RefreshToken.for_user(user)             -------------------------------------step4

	def validate(self, attrs):
		data = super(TokenObtainPairSerializer, self).validate(attrs)

		refresh = self.get_token(self.user)             ------------------------------------step3

		data['refresh'] = text_type(refresh)
		data['access'] = text_type(refresh.access_token)  ----------------------------------step7

		return data

			
			
			
			
D:\WorkSpace\Archiver\archiver_gitcode\venv\Lib\site-packages\rest_framework_simplejwt\tokens.py			
class RefreshToken(BlacklistMixin, Token):
	token_type = 'refresh'
	lifetime = api_settings.REFRESH_TOKEN_LIFETIME
	no_copy_claims = (api_settings.TOKEN_TYPE_CLAIM, 'exp', 'jti')

	@property
	def access_token(self):
		"""
		Returns an access token created from this refresh token.  Copies all
		claims present in this refresh token to the new access token except
		those claims listed in the `no_copy_claims` attribute.
		"""
		access = AccessToken()          -----------------------------------------------------step8:    AccessToken    

		# Use instantiation time of refresh token as relative timestamp for
		# access token "exp" claim.  This ensures that both a refresh and
		# access token expire relative to the same time if they are created as
		# a pair.
		access.set_exp(from_time=self.current_time)

		no_copy = self.no_copy_claims
		for claim, value in self.payload.items():   -----------------------------------------step9:    step6_x      RefreshToken   payload     token_type/exp/jti/user_id
			if claim in no_copy:                                                               RefreshToken        token_type/exp/jti                  user_id
				continue
			access[claim] = value

		return access

		
class AccessToken(Token):
	token_type = 'access'
	lifetime = api_settings.ACCESS_TOKEN_LIFETIME

		
class BlacklistMixin(object):
	"""
	If the `rest_framework_simplejwt.token_blacklist` app was configured to be
	used, tokens created from `BlacklistMixin` subclasses will insert
	themselves into an outstanding token list and also check for their
	membership in a token blacklist.
	"""
	if 'rest_framework_simplejwt.token_blacklist' in settings.INSTALLED_APPS:
		def verify(self, *args, **kwargs):
			self.check_blacklist()

			super(BlacklistMixin, self).verify(*args, **kwargs)

		def check_blacklist(self):
			"""
			Checks if this token is present in the token blacklist.  Raises
			`TokenError` if so.
			"""
			jti = self.payload['jti']

			if BlacklistedToken.objects.filter(token__jti=jti).exists():
				raise TokenError(_('Token is blacklisted'))

		def blacklist(self):
			"""
			Ensures this token is included in the outstanding token list and
			adds it to the blacklist.
			"""
			jti = self.payload['jti']
			exp = self.payload['exp']

			# Ensure outstanding token exists with given jti
			token, _ = OutstandingToken.objects.get_or_create(
				jti=jti,
				defaults={
					'token': str(self),
					'expires_at': datetime_from_epoch(exp),
				},
			)

			return BlacklistedToken.objects.get_or_create(token=token)

		@classmethod
		def for_user(cls, user):        -----------------------------------------------------step5_1
			"""
			Adds this token to the outstanding token list.
			"""
			token = super(BlacklistMixin, cls).for_user(user)

			jti = token['jti']
			exp = token['exp']

			OutstandingToken.objects.create(
				user=user,
				jti=jti,
				token=str(token),
				created_at=token.current_time,
				expires_at=datetime_from_epoch(exp),
			)

			return token
	
	
@python_2_unicode_compatible
class Token(object):
	"""
	A class which validates and wraps an existing JWT or can be used to build a
	new JWT.
	"""
	token_type = None
	lifetime = None

	def __init__(self, token=None, verify=True):
		"""
		!!!! IMPORTANT !!!! MUST raise a TokenError with a user-facing error
		message if the given token is invalid, expired, or otherwise not safe
		to use.
		"""
		if self.token_type is None or self.lifetime is None:
			raise TokenError(_('Cannot create token with no type or lifetime'))

		self.token = token
		self.current_time = aware_utcnow()

		# Set up token
		if token is not None:
			# An encoded token was provided
			from .state import token_backend

			# Decode token
			try:
				self.payload = token_backend.decode(token, verify=verify)
			except TokenBackendError:
				raise TokenError(_('Token is invalid or expired'))

			if verify:
				self.verify()
		else:
			# New token.  Skip all the verification steps.
			self.payload = {api_settings.TOKEN_TYPE_CLAIM: self.token_type}     ------------step6_2   token_type    RefreshToken   payload   

			# Set "exp" claim with default value
			self.set_exp(from_time=self.current_time, lifetime=self.lifetime)   ------------step6_3   exp    RefreshToken   payload   

			# Set "jti" claim
			self.set_jti()                                                      ------------step6_4   jti(jwt-token-id)    RefreshToken   payload   

	def __repr__(self):
		return repr(self.payload)

	def __getitem__(self, key):
		return self.payload[key]

	def __setitem__(self, key, value):
		self.payload[key] = value

	def __delitem__(self, key):
		del self.payload[key]

	def __contains__(self, key):
		return key in self.payload

	def get(self, key, default=None):
		return self.payload.get(key, default)

	def __str__(self):
		"""
		Signs and returns a token as a base64 encoded string.
		"""
		from .state import token_backend

		return token_backend.encode(self.payload)

	def verify(self):
		"""
		Performs additional validation steps which were not performed when this
		token was decoded.  This method is part of the "public" API to indicate
		the intention that it may be overridden in subclasses.
		"""
		# According to RFC 7519, the "exp" claim is OPTIONAL
		# (https://tools.ietf.org/html/rfc7519#section-4.1.4).  As a more
		# correct behavior for authorization tokens, we require an "exp"
		# claim.  We don't want any zombie tokens walking around.
		self.check_exp()

		# Ensure token id is present
		if 'jti' not in self.payload:
			raise TokenError(_('Token has no id'))

		self.verify_token_type()

	def verify_token_type(self):
		"""
		Ensures that the token type claim is present and has the correct value.
		"""
		try:
			token_type = self.payload[api_settings.TOKEN_TYPE_CLAIM]
		except KeyError:
			raise TokenError(_('Token has no type'))

		if self.token_type != token_type:
			raise TokenError(_('Token has wrong type'))

	def set_jti(self):
		"""
		Populates the "jti" claim of a token with a string where there is a
		negligible probability that the same string will be chosen at a
		later time.

		See here:
		https://tools.ietf.org/html/rfc7519#section-4.1.7
		"""
		self.payload['jti'] = uuid4().hex

	def set_exp(self, claim='exp', from_time=None, lifetime=None):
		"""
		Updates the expiration time of a token.
		"""
		if from_time is None:
			from_time = self.current_time

		if lifetime is None:
			lifetime = self.lifetime

		self.payload[claim] = datetime_to_epoch(from_time + lifetime)

	def check_exp(self, claim='exp', current_time=None):
		"""
		Checks whether a timestamp value in the given claim has passed (since
		the given datetime value in `current_time`).  Raises a TokenError with
		a user-facing error message if so.
		"""
		if current_time is None:
			current_time = self.current_time

		try:
			claim_value = self.payload[claim]
		except KeyError:
			raise TokenError(format_lazy(_("Token has no '{}' claim"), claim))

		claim_time = datetime_from_epoch(claim_value)
		if claim_time <= current_time:
			raise TokenError(format_lazy(_("Token '{}' claim has expired"), claim))

	@classmethod
	def for_user(cls, user):   --------------------------------------------------------------step5_2
		"""
		Returns an authorization token for the given user that will be provided
		after authenticating the user's credentials.
		"""
		user_id = getattr(user, api_settings.USER_ID_FIELD)
		if not isinstance(user_id, int):
			user_id = text_type(user_id)

		token = cls()         ---------------------------------------------------------------step6_1    RefreshToken    
		token[api_settings.USER_ID_CLAIM] = user_id     -------------------------------------step6_5   user_id    RefreshToken   payload   
                                                                                                RefreshToken     Token      __setitem__    ,
		return token																		       RefreshToken           

좋은 웹페이지 즐겨찾기