用Flask實現token登錄校驗的解決方案
常用的登錄方式
網站、小程序、APP 是否已經登錄所代表的狀態(tài),代表一個概念是登錄態(tài)。
我們常用的登錄態(tài)驗證方式主要是如下3種 cookie,session,token
cookie和session 采用的是緩存機制,是需要在瀏覽器端或服務器端,儲存用戶登錄狀態(tài)。
兩者的特點,cookie 采用的瀏覽器緩存 相對于 session 采用的服務器緩存機制安全較差,但后者很怕用戶過多給服務器代理過大的壓力。
token 提供了另外一種不需要緩存賬戶和密碼的登錄狀態(tài)驗證方式。首次登錄后,服務器會給客戶端發(fā)送一個Token,這個Token相當于門禁卡,每次訪問服務器的時候攜帶這個Token,服務器端就可以驗證是誰登錄了。
但為了防止Token被盜用,我們會把Token的有效期設置的很短。Token過期后,向服務器再申請新的Token.這樣就可以更加安全的使用Token方式進行登錄了。

用Flask實現TOKEN登錄的解決方案
環(huán)境準備
python flask模板
flask-login 登錄驗證模板
Redis 緩存模板
登錄驗證機制
通過解密TOKEN獲得有效的登錄用戶相關信息,并把登錄信息保存在服務器端的Redis內。
這樣,確保即使別人盜取了你的Token.如果和服務器Redis中保存的Token不一致。同樣系統(tǒng)會判斷登錄不成功。
第一步 用戶登錄
auth.py
通過 https://自己的服務器地址/login 登錄成功,并返回 access_token和refresh_token到前端。
from flask_restful import Api,Resource
from db.models import *
admin_login_parser = reqparse.RequestParser()
admin_login_parser.add_argument("username", type=str, location='values', help='輸入管理員用戶名')
admin_login_parser.add_argument("password", type=str, location='values', help='輸入管理員密碼')
class Getlogin(Resource):
"""系統(tǒng)登錄"""
def post(self):
args = admin_login_parser.parse_args()
admin = User.query.filter(User.username == args.username).first()
front_access_token, front_refresh_token = admin.get_id(life_time=7200, last_login_time=last_login_time, need_refresh_token=True)
rd.set_redis_data('front_access_token'+'_'+str(admin.id), front_access_token)
rd.set_redis_data('front_refresh_token'+'_'+str(admin.id),front_refresh_token)
return jsonify(get_result(0, '登錄成功', {'access_token': front_access_token, 'refresh_token': front_refresh_token, 'userInfo': userInfo}))第二步 前端保存 token 每一個新的請求中(這里用的是Uniapp前端),把access_token和refresh_token 保存在前端。
/**
* 調用微信登錄
*/
function login() {
Utils.request(api.AuthLogin,{
username:'你的登錄名',
password:'你的密碼'
},'POST').then(res=>{
uni.setStorageSync('isLogin',true)
uni.setStorageSync('userInfo', res.data.userInfo);
uni.setStorageSync('Access-Token', res.data.access_token);
uni.setStorageSync('refresh_token',res.data.refresh_token);
setTimeout(()=>{
resolve({
userInfo:res.data.userInfo,
access_token:res.data.access_token,
})
},3000)
})
}第三步 給前端的請求頭添加access_token
uni.addInterceptor('request',{
// 在發(fā)送請求之前做一些處理
invoke(requestConfig){
// 添加請求頭、身份驗證等
requestConfig.header['Access-Token'] = uni.getStorageSync('Access-Token')
// requestConfig.header['Refresh-Token'] = uni.getStorageSync('refresh_token')
// 添加token到請求頭
return requestConfig
},
success(response){
switch (response.data.errno) {
case 401:
uni.setStorageSync("Access-Token",'')
uni.setStorageSync("refresh_token",'')
uni.setStorageSync('isLogin',false)
uni.navigateTo({
url:'/pages/login/login/index'
})
break
case 403:
Utils.reloadMessage()
break
}
},
fail(error){
console.error('請求失敗',error)
}
})第四步 后端登錄效驗
這里采用了flask_login 登錄模塊,flask_login 這里我們從新定義 登錄裝飾器 login_required_token,用于效驗利用token進行的登錄。
def login_required_token(func):
@wraps(func)
def decorated_view(*args, **kwargs):
# 用戶登錄狀態(tài)
if not current_user.is_authenticated:
if g.refresh:
g.refresh = False
return jsonify(base.get_result(403, 'token過期重建', {'token': 'refresh'}))
return current_app.login_manager.unauthorized()
if callable(getattr(current_app, "ensure_sync", None)):
return current_app.ensure_sync(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorated_view第五步 token過期,前端向后端發(fā)送 front_refresh_token
token 過期后,g.refresh 會設置為 True

g.refresh 為True是想后前端反饋狀態(tài)碼403

前端接受到403 的反饋后,執(zhí)行Utils.reloadMessage函數獲取新的token
public static reloadMessage = ()=>{
let header = {
'Content-Type': 'application/json; charset=UTF-8',
"Access-Control-Allow-Origin": "*",
"Refresh-Token":uni.getStorageSync('refresh_token')
}
Utils.request(api.getNewToken,{},"POST",header).then(res=>{
uni.setStorageSync('Access-Token', res.data.access_token)
})
}第六步 前端向后端請求,獲取新的Access-Token
https://自己的服務器地址/getNewToken
def check_refresh_token(func):
"""refresh_token"""
@wraps(func)
def inner(*args, **kwargs):
refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
if refresh_data is None:
rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
return jsonify(base.get_result(401,'系統(tǒng)退出2'))
try:
refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
refresh_token = get_token_key(refresh_token)
except Exception as e:
return jsonify(base.get_result(401,'系統(tǒng)退出3'))
else:
# 生成新的token,保存在g中
user = User.query.get(refresh_token.get('user_id'))
last_login_time=int(time.time())
front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
need_refresh_token=True)
rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
g.new_token = front_access_token
return func(*args, **kwargs)
return inner
class GetNewToken(Resource):
"""申請新的Token"""
@check_refresh_token
def post(self):
return jsonify(get_result(0,'申請成功',{'access_token':g.new_token}))
api.add_resource(GetNewToken, '/getNewToken')decorators.py 完整代碼
from functools import wraps
from flask import session,redirect,url_for,request,jsonify,Markup,flash,abort,current_app,g
from db.models import User,Guest
from flask_login import LoginManager,login_user,logout_user,login_required,current_user,UserMixin
from apps.model import Token
import lib.base as base
import json
import apps.redis as rd
import time
#訪問控制
# 實例化LoginManager類
login_manager = LoginManager()
# 提示信息
login_manager.login_message = "請先登錄"
# 提示樣式
login_manager.login_message_category = 'danger'
login_manager.anonymous_user = Guest
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def load_token(tok):
"""通過loads()方法來解析瀏覽器發(fā)送過來的token,從而進行初步的驗證"""
try:
api_key = Token.validate_token(tok)
user = User.query.get(api_key.get('user_id'))
except Exception as e:
return None
if user:
user.token = tok
else:
print('用戶不存在,令牌不正確!')
return None
return user
def get_token_key(key):
"""解密token到數據"""
try:
key = key.replace('Basic ', '', 1)
key = key.encode('utf-8')
key = Token.validate_token(key)
except Exception as e:
# except TypeError:
return None
return key
def get_key(value):
if value == '':
value = None
return value
@login_manager.unauthorized_handler
def unauthorized():
"""系統(tǒng)超時退出"""
return jsonify(base.get_result(401,'系統(tǒng)退出2'))
@login_manager.request_loader
def load_user_from_request(request):
"""效驗請求中的token"""
if request.method == "POST":
g.refresh = False
"""# 請求正文中攜帶token時,執(zhí)行如下代碼
捕捉請求參數中的 'api_key' 變量,這個api_key和數據庫中存儲的api_key比較,確認登錄身份。
"""
api_access_key = request.args.get('api_key')
if api_access_key:
user = User.query.filter_by(api_key=api_access_key).first()
if user:
return user
"""請求頭中攜帶token,執(zhí)行如下代碼
捕捉請求參數中的 'X-Token' 變量,這個Access-Token通過服務端解碼后,獲得用戶ID和Access-Token有效期,確認登錄身份。
"""
# 取請求頭中的access-token
api_access_key = get_key(request.headers.get('Access-Token'))
# 對比前端的token和服務器中保存的token是否一致,判斷用戶是否已經登錄了
if api_access_key:
login_data = get_token_key(api_access_key)
# 判斷token是否過期,沒有過期返回用戶對象。
if login_data and api_access_key == get_key(
rd.get_redis_data('front_access_token' + '_' + str(login_data.get('user_id')))):
g.user = User.query.get(login_data.get('user_id'))
return g.user
# 判斷如果token已經過期了,但是front_refresh_token沒有過期,執(zhí)行如下代碼?
else:
g.refresh = True
return None
return None
def confirm_required(func):
def decorated_function(*args, **kwargs):
if current_user.state.name == 'inactive':
logout_user()
message = Markup(
'賬戶未激活')
flash(message, 'warning')
return redirect(url_for('wx.login'))
return func(*args, **kwargs)
return decorated_function
def permission_required(permission_name):
def decorator(func):
@wraps(func)
def decorated_function(*args, **kwargs):
if not current_user.can(permission_name):
abort(403)
return func(*args, **kwargs)
return decorated_function
return decorator
def admin_required(func):
return permission_required('ADMINISTER')(func)
def login_required_token(func):
@wraps(func)
def decorated_view(*args, **kwargs):
# 用戶登錄狀態(tài)
if not current_user.is_authenticated:
if g.refresh:
g.refresh = False
return jsonify(base.get_result(403, 'token過期重建', {'token': 'refresh'}))
return current_app.login_manager.unauthorized()
if callable(getattr(current_app, "ensure_sync", None)):
return current_app.ensure_sync(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorated_view
def check_refresh_token(func):
"""refresh_token"""
@wraps(func)
def inner(*args, **kwargs):
refresh_data = get_token_key(get_key(request.headers.get('Refresh-Token')))
if refresh_data is None:
rd.del_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
return jsonify(base.get_result(401,'系統(tǒng)退出2'))
try:
refresh_token = rd.get_redis_data('front_refresh_token' + '_' + str(refresh_data.get('user_id')))
refresh_token = get_token_key(refresh_token)
except Exception as e:
return jsonify(base.get_result(401,'系統(tǒng)退出3'))
else:
# 生成新的token,保存在g中
user = User.query.get(refresh_token.get('user_id'))
last_login_time=int(time.time())
front_access_token, front_refresh_token = user.get_id(life_time=10, last_login_time=last_login_time,
need_refresh_token=True)
rd.set_redis_data('front_access_token'+'_'+str(user.id),front_access_token)
g.new_token = front_access_token
return func(*args, **kwargs)
return inner以上就是用Flask實現token登錄校驗的解決方案的詳細內容,更多關于Flask實現token登錄校驗的資料請關注腳本之家其它相關文章!
相關文章
Pytorch中的torch.nn.Linear()方法用法解讀
這篇文章主要介紹了Pytorch中的torch.nn.Linear()方法用法,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02

