Flask-WTF에서 csrf-token의 생성 및 검증 원본 분석

6316 단어
Flask-WTF 소스 분석에서 CSRFTOKEN의 생성과 검증에 문제가 있으니 이 절차를 다시 분석해 보겠습니다.
  • csrf_token의 생성


  • 1. csrf 대상을 생성합니다. 이 대상은 주로 csrf 를 생성하고 검사하는 데 사용됩니다.token의 2.UnboundField(CSRFtokenField) 대상을 생성하여 사용자 정의 Field 목록에 삽입합니다.BaseForm에서 프로세스 방법을 호출하고 Field에서 프로세스 방법을 호출하여current 생성token의 값 4.wiget의call 방법은 input 탭을 렌더링할 때 input의value 속성을 currenttoken
    ...
    #wtforms.form.BaseForm
     extra_fields = []
            # , csrf_token
            if meta.csrf:
                # csrf , csrf_token 
                self._csrf = meta.build_csrf(self)
                # CSRFTokenField , extra_fields 
                extra_fields.extend(self._csrf.setup_form(self))
    ...
    
      #wtforms.meta.DefaultMeta
     def build_csrf(self, form):
             #csrf_class DefaultMeta None, _FlaskFormCSRF , if , None
            if self.csrf_class is not None:
                # _FlaskFormCSRF 
                return self.csrf_class()
            # , SessionCSRF 
            from wtforms.csrf.session import SessionCSRF
            return SessionCSRF()
    
    #wtforms.csrf.core.CSRF
    field_class = CSRFTokenField
     def setup_form(self, form):
            meta = form.meta
          # field_name=csrf_token
            field_name = meta.csrf_field_name
            # CSRFTokenField , UnboundField 
            unbound_field = self.field_class(
                label='CSRF Token',
                csrf_impl=self
            )
            # [('csrf_token,UnboundField')]
            # UnboundField bind CSRFTokenField 
            return [(field_name, unbound_field)]
    
    #wtforms.csrf.core.CSRFTokenField
    def process(self, *args):
            super(CSRFTokenField, self).process(*args)
            #self.csrf_Impl=_FlaskFormCSRF
            # _FlaskFormCSRF generate_csrf_token() csrf_token
            self.current_token = self.csrf_impl.generate_csrf_token(self)
    
    #flask_wtf.csrf.generate_csrf
    def generate_csrf(secret_key=None, token_key=None):
       secret_key = _get_config(
            secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
            message='A secret key is required to use CSRF.'
        )
        field_name = _get_config(
            token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
            message='A field name is required to use CSRF.'
        )
       
        if field_name not in g:
            if field_name not in session:
                 # session 
                session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
             # URLSafeTimedSerializer csrf_token g 
            s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
            setattr(g, field_name, s.dumps(session[field_name]))
    
        return g.get(field_name)
    

    이로써 csrftoken의 값 생성 완료!
  • csrf_token의 검증


  • 1. 폼의validate 호출on_submit 방법으로 검사를 시작합니다.폼에 사용자 정의 검사가 있는지 판단합니다 3.없으면 Form의 모든 Field를 옮겨다니는validate () 방법으로 검사합니다
    # 
     if form.validate_on_submit():
    
    #wtforms.form.Form
    def validate(self):
            extra = {}
            for name in self._fields:
                # Field 
                inline = getattr(self.__class__, 'validate_%s' % name, None)
                if inline is not None:
                    extra[name] = [inline]
            # BaseForm validate 
            return super(Form, self).validate(extra)
    
    #wtforms.form.BaseForm
    def validate(self, extra_validators=None):
            self._errors = None
            success = True
            for name, field in iteritems(self._fields):
                if extra_validators is not None and name in extra_validators:
                    extra = extra_validators[name]
                else:
                    extra = tuple()
                # CSRFTokenField validate , Field validate 
                if not field.validate(self, extra):
                    success = False
            return success
    
    #wtforms.fields.core.Field
    def validate(self, form, extra_validators=tuple()):
            self.errors = list(self.process_errors)
            stop_validation = False
            # Call pre_validate
            try:
                # CSRFTokenField pre_validate 
                self.pre_validate(form)
            except StopValidation as e:
                if e.args and e.args[0]:
                    self.errors.append(e.args[0])
                stop_validation = True
            except ValueError as e:
                self.errors.append(e.args[0])
    
            # Run validators
            if not stop_validation:
                chain = itertools.chain(self.validators, extra_validators)
                stop_validation = self._run_validation_chain(form, chain)
    
            # Call post_validate
            try:
                # CSRFTokenField pre_validate 
                self.post_validate(form, stop_validation)
            except ValueError as e:
                self.errors.append(e.args[0])
            return len(self.errors) == 0
    
    #wtforms.csrf.core.CSRFTokenField
     def pre_validate(self, form):
            # self.csrf_impl=flask_wtf.csrf._FlaskFormCSRF
            self.csrf_impl.validate_csrf_token(form, self)
    
    #flask_wtf.csrf.validate_csrf
    def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
        secret_key = _get_config(
            secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
            message='A secret key is required to use CSRF.'
        )
        field_name = _get_config(
            token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
            message='A field name is required to use CSRF.'
        )
        #csrf_token 
        time_limit = _get_config(
            time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
        )
       #csrf_token 
        if not data:
            raise ValidationError('The CSRF token is missing.')
        #csrf_token session 
        if field_name not in session:
            raise ValidationError('The CSRF session token is missing.')
        
        s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
        # csrf_token 
        try:
            token = s.loads(data, max_age=time_limit)
        except SignatureExpired:
            raise ValidationError('The CSRF token has expired.')
        except BadData:
            raise ValidationError('The CSRF token is invalid.')
    
        if not safe_str_cmp(session[field_name], token):
            raise ValidationError('The CSRF tokens do not match.')
    

    글자 부호는 쉽지 않으니 마음에 들면 조심하세요!

    좋은 웹페이지 즐겨찾기