欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

用Flask實(shí)現(xiàn)token登錄校驗(yàn)的解決方案

 更新時間:2024年03月25日 11:50:11   作者:zhouqi1208  
網(wǎng)站、小程序、APP 是否已經(jīng)登錄所代表的狀態(tài),代表一個概念是登錄態(tài), 我們常用的登錄態(tài)驗(yàn)證方式有cookie,session,token,token提供了另外一種不需要緩存賬戶和密碼的登錄狀態(tài)驗(yàn)證方式,本文給大家介紹了用Flask實(shí)現(xiàn)token登錄校驗(yàn)的解決方案,需要的朋友可以參考下

常用的登錄方式

網(wǎng)站、小程序、APP 是否已經(jīng)登錄所代表的狀態(tài),代表一個概念是登錄態(tài)。

我們常用的登錄態(tài)驗(yàn)證方式主要是如下3種 cookie,session,token

cookie和session 采用的是緩存機(jī)制,是需要在瀏覽器端或服務(wù)器端,儲存用戶登錄狀態(tài)。

兩者的特點(diǎn),cookie 采用的瀏覽器緩存 相對于 session 采用的服務(wù)器緩存機(jī)制安全較差,但后者很怕用戶過多給服務(wù)器代理過大的壓力。

token 提供了另外一種不需要緩存賬戶和密碼的登錄狀態(tài)驗(yàn)證方式。首次登錄后,服務(wù)器會給客戶端發(fā)送一個Token,這個Token相當(dāng)于門禁卡,每次訪問服務(wù)器的時候攜帶這個Token,服務(wù)器端就可以驗(yàn)證是誰登錄了。

但為了防止Token被盜用,我們會把Token的有效期設(shè)置的很短。Token過期后,向服務(wù)器再申請新的Token.這樣就可以更加安全的使用Token方式進(jìn)行登錄了。

用Flask實(shí)現(xiàn)TOKEN登錄的解決方案

環(huán)境準(zhǔn)備

python flask模板

flask-login 登錄驗(yàn)證模板

Redis 緩存模板

登錄驗(yàn)證機(jī)制

通過解密TOKEN獲得有效的登錄用戶相關(guān)信息,并把登錄信息保存在服務(wù)器端的Redis內(nèi)。

這樣,確保即使別人盜取了你的Token.如果和服務(wù)器Redis中保存的Token不一致。同樣系統(tǒng)會判斷登錄不成功。

第一步 用戶登錄

  auth.py

 通過 https://自己的服務(wù)器地址/login 登錄成功,并返回 access_token和refresh_token到前端。

from flask_restful import Api,Resource
from db.models import *
 
 
admin_login_parser = reqparse.RequestParser()
admin_login_parser.add_argument("username", type=str, location='values', help='輸入管理員用戶名')
admin_login_parser.add_argument("password", type=str, location='values', help='輸入管理員密碼')
 
 
class Getlogin(Resource):
"""系統(tǒng)登錄"""
 
    def post(self):
        args = admin_login_parser.parse_args()
 
        admin = User.query.filter(User.username == args.username).first()
 
        front_access_token, front_refresh_token = admin.get_id(life_time=7200, last_login_time=last_login_time, need_refresh_token=True)
 
         rd.set_redis_data('front_access_token'+'_'+str(admin.id), front_access_token)
            rd.set_redis_data('front_refresh_token'+'_'+str(admin.id),front_refresh_token)
 
        return jsonify(get_result(0, '登錄成功', {'access_token': front_access_token, 'refresh_token': front_refresh_token, 'userInfo': userInfo}))

第二步 前端保存 token 每一個新的請求中(這里用的是Uniapp前端),把a(bǔ)ccess_token和refresh_token 保存在前端。

/**
 * 調(diào)用微信登錄
 */
function login() {
   Utils.request(api.AuthLogin,{
                username:'你的登錄名',
                password:'你的密碼'
            },'POST').then(res=>{
               uni.setStorageSync('isLogin',true)
               uni.setStorageSync('userInfo', res.data.userInfo);
               uni.setStorageSync('Access-Token', res.data.access_token);
               uni.setStorageSync('refresh_token',res.data.refresh_token);
 
                setTimeout(()=>{
                    resolve({
                        userInfo:res.data.userInfo,
                        access_token:res.data.access_token,
                    })
                },3000)
            })
}

第三步 給前端的請求頭添加access_token

uni.addInterceptor('request',{
    // 在發(fā)送請求之前做一些處理
    invoke(requestConfig){
        // 添加請求頭、身份驗(yàn)證等
        requestConfig.header['Access-Token'] = uni.getStorageSync('Access-Token')
        // requestConfig.header['Refresh-Token'] = uni.getStorageSync('refresh_token')
 
        // 添加token到請求頭
 
 
        return requestConfig
    },
    success(response){
        switch (response.data.errno) {
            case 401:
                uni.setStorageSync("Access-Token",'')
                uni.setStorageSync("refresh_token",'')
                uni.setStorageSync('isLogin',false)
                uni.navigateTo({
                    url:'/pages/login/login/index'
                })
                break
            case 403:
                Utils.reloadMessage()
                break
        }
    },
    fail(error){
        console.error('請求失敗',error)
    }
})

第四步 后端登錄效驗(yàn) 

 這里采用了flask_login 登錄模塊,flask_login 這里我們從新定義 登錄裝飾器 login_required_token,用于效驗(yàn)利用token進(jìn)行的登錄。

def login_required_token(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        # 用戶登錄狀態(tài)
        if not current_user.is_authenticated:
            if g.refresh:
                g.refresh = False
                return jsonify(base.get_result(403, 'token過期重建', {'token': 'refresh'}))
 
            return current_app.login_manager.unauthorized()
        if callable(getattr(current_app, "ensure_sync", None)):
 
            return current_app.ensure_sync(func)(*args, **kwargs)
 
        return func(*args, **kwargs)
 
    return decorated_view

第五步 token過期,前端向后端發(fā)送 front_refresh_token

token 過期后,g.refresh 會設(shè)置為 True

g.refresh 為True是想后前端反饋狀態(tài)碼403

前端接受到403 的反饋后,執(zhí)行Utils.reloadMessage函數(shù)獲取新的token
public static reloadMessage = ()=>{
      let header = {
          'Content-Type': 'application/json; charset=UTF-8',
          "Access-Control-Allow-Origin": "*",
          "Refresh-Token":uni.getStorageSync('refresh_token')
      }
      Utils.request(api.getNewToken,{},"POST",header).then(res=>{
          uni.setStorageSync('Access-Token', res.data.access_token)
      })
  }

第六步 前端向后端請求,獲取新的Access-Token

https://自己的服務(wù)器地址/getNewToken

def check_refresh_token(func):
    """refresh_token"""
    @wraps(func)
    def inner(*args, **kwargs):
 
        refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
 
 
 
        if refresh_data is None:
            rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            return jsonify(base.get_result(401,'系統(tǒng)退出2'))
 
        try:
            refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            refresh_token = get_token_key(refresh_token)
 
        except Exception as e:
            return jsonify(base.get_result(401,'系統(tǒng)退出3'))
 
        else:
            # 生成新的token,保存在g中
            user = User.query.get(refresh_token.get('user_id'))
            last_login_time=int(time.time())
            front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
                                                                   need_refresh_token=True)
 
            rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
            g.new_token = front_access_token
 
            return func(*args, **kwargs)
 
    return inner
 
 
class GetNewToken(Resource):
    """申請新的Token"""
 
    @check_refresh_token
    def post(self):
        return jsonify(get_result(0,'申請成功',{'access_token':g.new_token}))
 
 
api.add_resource(GetNewToken, '/getNewToken')

decorators.py 完整代碼

from functools import wraps
from flask import session,redirect,url_for,request,jsonify,Markup,flash,abort,current_app,g
from db.models import User,Guest
from flask_login import LoginManager,login_user,logout_user,login_required,current_user,UserMixin
 
 
from apps.model import Token
import lib.base as base
import json
import apps.redis as rd
import time
 
#訪問控制
# 實(shí)例化LoginManager類
login_manager = LoginManager()
 
# 提示信息
login_manager.login_message = "請先登錄"
# 提示樣式
login_manager.login_message_category = 'danger'
 
login_manager.anonymous_user = Guest
 
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
 
def load_token(tok):
    """通過loads()方法來解析瀏覽器發(fā)送過來的token,從而進(jìn)行初步的驗(yàn)證"""
    try:
        api_key = Token.validate_token(tok)
        user = User.query.get(api_key.get('user_id'))
    except Exception as e:
        return None
 
    if user:
        user.token = tok
 
    else:
        print('用戶不存在,令牌不正確!')
        return None
    return user
 
 
def get_token_key(key):
    """解密token到數(shù)據(jù)"""
    try:
        key = key.replace('Basic ', '', 1)
        key = key.encode('utf-8')
        key = Token.validate_token(key)
 
    except Exception as e:
    # except TypeError:
        return None
    return key
 
 
def get_key(value):
    if value == '':
        value = None
    return value
 
 
@login_manager.unauthorized_handler
def unauthorized():
    """系統(tǒng)超時退出"""
    return jsonify(base.get_result(401,'系統(tǒng)退出2'))
 
 
@login_manager.request_loader
def load_user_from_request(request):
    """效驗(yàn)請求中的token"""
    if request.method == "POST":
        g.refresh = False
        """# 請求正文中攜帶token時,執(zhí)行如下代碼
          捕捉請求參數(shù)中的 'api_key' 變量,這個api_key和數(shù)據(jù)庫中存儲的api_key比較,確認(rèn)登錄身份。 
        """
        api_access_key = request.args.get('api_key')
        if api_access_key:
            user = User.query.filter_by(api_key=api_access_key).first()
            if user:
                return user
 
        """請求頭中攜帶token,執(zhí)行如下代碼
           捕捉請求參數(shù)中的 'X-Token' 變量,這個Access-Token通過服務(wù)端解碼后,獲得用戶ID和Access-Token有效期,確認(rèn)登錄身份。
        """
        # 取請求頭中的access-token
        api_access_key = get_key(request.headers.get('Access-Token'))
 
 
        # 對比前端的token和服務(wù)器中保存的token是否一致,判斷用戶是否已經(jīng)登錄了
        if api_access_key:
 
            login_data = get_token_key(api_access_key)
 
            # 判斷token是否過期,沒有過期返回用戶對象。
            if login_data and api_access_key == get_key(
                    rd.get_redis_data('front_access_token' + '_' + str(login_data.get('user_id')))):
                g.user = User.query.get(login_data.get('user_id'))
 
                return g.user
 
            # 判斷如果token已經(jīng)過期了,但是front_refresh_token沒有過期,執(zhí)行如下代碼?
            else:
                g.refresh = True
            return None
 
        return None
 
 
def confirm_required(func):
    def decorated_function(*args, **kwargs):
        if current_user.state.name == 'inactive':
            logout_user()
            message = Markup(
                '賬戶未激活')
            flash(message, 'warning')
            return redirect(url_for('wx.login'))
        return func(*args, **kwargs)
    return decorated_function
 
 
def permission_required(permission_name):
    def decorator(func):
        @wraps(func)
        def decorated_function(*args, **kwargs):
            if not current_user.can(permission_name):
                abort(403)
            return func(*args, **kwargs)
        return decorated_function
    return decorator
 
 
def admin_required(func):
    return permission_required('ADMINISTER')(func)
 
def login_required_token(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        # 用戶登錄狀態(tài)
        if not current_user.is_authenticated:
            if g.refresh:
                g.refresh = False
                return jsonify(base.get_result(403, 'token過期重建', {'token': 'refresh'}))
 
            return current_app.login_manager.unauthorized()
 
        if callable(getattr(current_app, "ensure_sync", None)):
 
            return current_app.ensure_sync(func)(*args, **kwargs)
 
 
        return func(*args, **kwargs)
 
    return decorated_view
 
 
def check_refresh_token(func):
    """refresh_token"""
    @wraps(func)
    def inner(*args, **kwargs):
 
        refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
 
 
 
        if refresh_data is None:
            rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            return jsonify(base.get_result(401,'系統(tǒng)退出2'))
 
        try:
            refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
            refresh_token = get_token_key(refresh_token)
 
        except Exception as e:
            return jsonify(base.get_result(401,'系統(tǒng)退出3'))
 
        else:
            # 生成新的token,保存在g中
            user = User.query.get(refresh_token.get('user_id'))
            last_login_time=int(time.time())
            front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
                                                                   need_refresh_token=True)
 
            rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
            g.new_token = front_access_token
 
            return func(*args, **kwargs)
 
    return inner

以上就是用Flask實(shí)現(xiàn)token登錄校驗(yàn)的解決方案的詳細(xì)內(nèi)容,更多關(guān)于Flask實(shí)現(xiàn)token登錄校驗(yàn)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論