用 flask_restful 所完成的 API 是沒有任何保護措施,所有的人可以任意使用, flask_jwt_extended 套件已經完成大部分的程式邏輯與流程,利用它來完成登入登出 API。
依照慣例本篇寫的內容,所有基礎都是在前篇內容再增加功能,缺少基本基礎的讀者請往這走Flask 限制 API 使用次數,本篇分成三個部分新增使用者、登入與登出 API。
登入
登出
來至 byte byte go 的 JWT 授權流程圖中,簡單明瞭授權的流程。
新增使用者程式碼看這篇主機端 Flask APIs 設計。
用 create_access_token(identity="identity-string") 產生使用者個授權密令,參數 identity 任何字串都可以,範例中將使用者的一些資料轉成字串,最後回傳 access_token 授權密令,這個 API 就完成了,將 UserLoginResource 加入 API 控制就可以使用。
# z5525/handlers/user.py
import json
import sqlalchemy.exc as sa_exc
from flask import Flask, request, jsonify
from flask_restful import Api, Resource, abort
from werkzeug.security import generate_password_hash, check_password_hash
from flask_jwt_extended import (
JWTManager, create_access_token,
jwt_required, get_jwt_identity
)
from z5525.models.user import db, User, RevokedToken
# 用戶登入
class UserLoginResource(Resource):
def post(self):
# 流程:
# 1. create_access_token(identity="identity-string")
# 2. identity = get_jwt_identity()
data = request.get_json()
email = data.get('email')
password = data.get('password')
user = User.query.filter_by(email=email).first()
if not user or not check_password_hash(user.password, password):
return {
"status": "ok",
'message': 'Invalid credentials'}, 401
# Invalid password
identity = {
"id": user.id,
"username": user.username,
"is_admin": user.is_admin
}
access_token = create_access_token(
identity=json.dumps(identity)
)
return {
"status": "ok",
"access_token": access_token,
"id": user.id
}, 200
@jwt_required()
要求授權加入保護的數據,把 UserResource 的 get 拷貝一份,重新命名 UserProtectedResource ,最後在 def 上頭加入 @jwt_required() 這樣,這個 API 就會受到保護。
# z5525/handlers/user.py
class UserProtectedResource(Resource):
# READ
@jwt_required()
def get(self, user_id=None):
if user_id:
try:
user = db.session.execute(db.select(User).filter_by(id=user_id)).scalar_one()
return {
"status": "ok",
"total": 1,
"count": 1,
'user': user.to_dict()
}, 200
except sa_exc.NoResultFound:
return {
"error": "Not Found",
"message": 'User not found'
}, 404
else:
total = db.session.query(User).count() # 總數量
offset = request.args.get('offset', default=0, type=int)
limit = request.args.get('limit', default=10, type=int)
users = db.session.execute(db.select(User).offset(offset).limit(limit)).scalars().all()
return {
"status": "ok",
"total": total,
"count": len(users),
"offset": offset,
"limit": limit,
'items': [user.to_dict() for user in users]
}, 200
flask_restful
Apiflask_restful 會接管所有的錯誤訊息,這樣不會觸發 flask_jwt_extended 所產生的錯誤,所以需要客製化的 Api 命名為 FixedApi 。
# 部分程式碼
class FixedApi(Api):
def handle_error(self, e):
if isinstance(e, NoAuthorizationError):
# 讓 Flask 處理,例如觸發 @jwt.unauthorized_loader
raise e
以下是未授權所產生的錯誤代碼,假如沒有處理這些錯誤會導致 404 error。
# jwt 內建的錯誤處理與訊息
jwt.exceptions.DecodeError: Not enough segments
flask_jwt_extended.exceptions.NoAuthorizationError: Missing Authorization Header
最後將 UserLoginResource 和 UserProtectedResource 加入 API 控制,以下就是完整的 api.py 程式碼
# z5525/handlers/api.py
from flask_restful import Api, Resource
from flask_jwt_extended.exceptions import NoAuthorizationError
from flask_jwt_extended.exceptions import InvalidHeaderError
from flask_jwt_extended.exceptions import RevokedTokenError
from jwt import DecodeError
from jwt import ExpiredSignatureError
from jwt import InvalidAudienceError
from jwt import InvalidIssuerError
from jwt import InvalidTokenError
from jwt import MissingRequiredClaimError
from z5525.handlers.users import (
UserResource, # 沒有 JWT 的 User API
UserProtectedResource, # 有 JWT 的 User API
UserLoginResource, # 剛完成的 JWT Login API
)
from z5525.app import (
limiter, # 限制 API 使用次數
)
APP_PREFIX = "/api"
class FixedApi(Api):
def handle_error(self, e):
if isinstance(e, NoAuthorizationError):
# 讓 Flask 處理,例如觸發 @jwt.unauthorized_loader
raise e
elif isinstance(e, InvalidHeaderError):
raise e
elif isinstance(e, RevokedTokenError):
raise e
elif isinstance(e, DecodeError):
raise e
elif isinstance(e, ExpiredSignatureError):
raise e
elif isinstance(e, InvalidAudienceError):
raise e
elif isinstance(e, InvalidIssuerError):
raise e
elif isinstance(e, InvalidTokenError):
# @jwt.invalid_token_loader
raise e
elif isinstance(e, MissingRequiredClaimError):
raise e
return super().handle_error(e)
class LimiterResource(Resource):
@limiter.limit("5 per minute")
def get(self):
return jsonify({"message": "Success"})
api = FixedApi()
# User
api.add_resource(UserResource, f"{APP_PREFIX}/users", f"{APP_PREFIX}/users/<int:user_id>")
api.add_resource(UserProtectedResource, f"{APP_PREFIX}/protected/users") # 加入 JWT User API
api.add_resource(UserLoginResource, f"{APP_PREFIX}/login") # 加入 Login API
# Test
api.add_resource(LimiterResource, f"/test/limiting")
新增 放在 app 目錄裡面,依照慣例命名一個 init_app 來完成 jwt 的所有設定與控制,在 load_callback 就是放需要另外處理的 jwt callback 相關程式碼。
# z5525/app/jwt_flask.py
from flask import Flask, request, jsonify
from http import HTTPStatus
def get_jwt_manager(app):
return app.extensions["flask-jwt-extended"]
def init_app(app):
from flask_jwt_extended import (
JWTManager, create_access_token,
jwt_required, get_jwt_identity
)
# JWT
app.config["JWT_SECRET_KEY"] = "super-secret" # 用於加密 JWT 的密鑰 Change this!
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = 3600 # Token 過期時間 (1 小時)
app.config['JWT_BLACKLIST_ENABLED'] = True
app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = ['access', 'refresh']
jwt = JWTManager(app)
return jwt
def load_callback(jwt):
# 客製化 error 代碼
# 程式碼之後會補上
將前篇Flask 限制 API 使用次數的 create_app 加入 jwt 相關程式碼,就可以完成 create_app 的程式碼。
# z5525/app/main.py
def create_app():
from flask import Flask, request, jsonify
from z5525.app.limiter_flask import limiter # 限制 API 使用次數
from z5525.app import jwt_flask # JWT 授權保護
from z5525.models import db
from z5525.handlers import api
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.json.compact = False
db.init_app(app)
jwt = jwt_flask.init_app(app) # init jwt
api.init_app(app)
limiter.init_app(app)
jwt_flask.load_callback(jwt) # 處理 jwt load_callback 和 errors
return app
執行 python -m z5525.app.main
,以下是在 iPad a-Shell app 中執行的結果。
python -m z5525.app.main
/var/mobile/Containers/Data/Application/5D4D24E8-73A4-4649-B083-00830586A0B3/Library/lib/python3.11/site-packages/flask_limiter/
extension.py:333: UserWarning: Using the in-memory storage for tracking rate limits as no storage was explicitly specified. This
is not recommended for production use. See: https://flask-limiter.readthedocs.io#configuring-a-storage-backend for documentatio
n about configuring the storage backend.
warnings.warn(
* Serving Flask app 'main'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5525
# 新增使用者 API
curl --header 'Content-Type: application/json' \
--request POST \
--data '{"username": "Test user",
"email": "[email protected]",
"password": "pass"
}' \
http://127.0.0.1:5525/api/users
{"status": "ok", "message": "User added successfully"}
# 登入 API
curl --header 'Content-Type: application/json' \
--request POST \
--data '{
"email": "[email protected]",
"password": "pass"
}' \
http://127.0.0.1:5525/api/login
{"status": "ok", "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc0OTE5NDg3NSwianRpIjoiZTlmZTFjZTMtYWEwYy00OWU0LWEwYTMtMGJhOTQwMjlhNDhlIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6IntcImlkXCI6IDEsIFwidXNlcm5hbWVcIjogXCJUZXN0IHVzZXJcIiwgXCJpc19hZG1pblwiOiBmYWxzZX0iLCJuYmYiOjE3NDkxOTQ4NzUsImNzcmYiOiIzNTI2Y2YzYy0xMWY1LTQ3NDQtOTEyMy01NzcwZWE3ZTQzMmIiLCJleHAiOjE3NDkxOTg0NzV9.9fn_-2xbMSM5lJNZ4qKkxKl6AQMaBI0C05f4Nn-lNQk", "id": 2}
有授權保護與沒有授權保護 User API 都可以得到相同的數據
# 有授權保護 User API
curl http://127.0.0.1:5525/api/protected/users --header "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc0OTE5NDg3NSwianRpIjoiZTlmZTFjZTMtYWEwYy00OWU0LWEwYTMtMGJhOTQwMjlhNDhlIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6IntcImlkXCI6IDEsIFwidXNlcm5hbWVcIjogXCJUZXN0IHVzZXJcIiwgXCJpc19hZG1pblwiOiBmYWxzZX0iLCJuYmYiOjE3NDkxOTQ4NzUsImNzcmYiOiIzNTI2Y2YzYy0xMWY1LTQ3NDQtOTEyMy01NzcwZWE3ZTQzMmIiLCJleHAiOjE3NDkxOTg0NzV9.9fn_-2xbMSM5lJNZ4qKkxKl6AQMaBI0C05f4Nn-lNQk"
{"status": "ok", "total": 1, "count": 1, "offset": 0, "limit": 10, "items": [{"id": 1, "username": "Test user", "email": "[email protected]", "is_admin": false}]}
# 沒有授權保護 User API
curl http://127.0.0.1:5525/api/users
{"status": "ok", "total": 1, "count": 1, "offset": 0, "limit": 10, "items": [{"id": 1, "username": "Test user", "email": "[email protected]", "is_admin": false}]}
接著測試有授權保護的 API 但是不帶授權,這時候會出現三種情況 1. 沒有用修改後的 , 2. 沒有使用 3. 有使用 後產生客製化的內容。這部分會在第二部分說明。
curl http://127.0.0.1:5525/api/protected/users
# 1. 沒有用修改後的 FixedApi
# 沒有 raise error
flask_jwt_extended.exceptions.NoAuthorizationError: Missing Authorization Header
{"message": "Internal Server Error"}
# 2. 沒有使用 @jwt.unauthorized_loader
# 內定回覆訊息
{"msg": "Missing Authorization Header"}
# 3. 有使用 @jwt.unauthorized_loader
# 客製化回覆訊息
{"error": "Permission Denied"}
# z5525/tests/test_user_login.py
import os, sys
import pytest
import time
import json
from datetime import datetime, timedelta
from werkzeug.security import generate_password_hash, check_password_hash
from flask_jwt_extended import create_access_token
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask_jwt_extended import JWTManager
from z5525.app.main import create_app
from z5525.handlers import api, UserResource
from z5525.models import db, User
APP_PREFIX = "/api"
@pytest.fixture
def client():
""" 建立 Flask 測試客戶端 """
app = create_app()
with app.app_context():
db.create_all()
yield app.test_client()
def test_api(client):
""" 測試 User API,並檢查回傳 JSON 資料 """
# 1.新增使用者 CREATE USER
res1 = client.post(f"{APP_PREFIX}/users", json={
"username": "Test user",
"email": "[email protected]",
"password": "pass",
})
assert res1.status_code == 201
assert "status" in res1.json
assert "ok" == res1.json["status"]
# 2.登入 LOGIN
res2 = client.post(f"{APP_PREFIX}/login", json={
"email": "[email protected]",
"password": "pass",
})
assert res2.status_code == 200
assert "id" in res2.json
assert "access_token" in res2.json
access_token = res2.json["access_token"]
以上就是登入 API 的所有流程與測試。
有了登入也要登出,範例直接寫入資料庫裡面也可以用 Redis 替代,加入一個 SQL 表格,命名為 RevokedToken ,在登出的時候需要透過 get_jwt()["jti"] 取得 jti , 是 unique identifier of the token 的縮寫,中文就是說獨一無二的授權密令辨識碼,目的是當登出的時候將它寫入資料庫,當需要檢查 JWT 授權 @jwt_required() 的時候,會先確定是否是黑名單,之後處理驗證程序。
# jti 是 unique identifier of the token
jti = get_jwt()['jti']
# print(jti) # f3480d0a-2d9c-403f-ae88-a6f41e5afe9f
@jwt.token_in_blocklist_loader
判斷 jti 是不是在資料庫中,來確定是否已經註銷。當成立的時候會觸發 RevokedTokenError ,在 FixedApi 中可以看到我們要自己處理這個錯誤。@jwt.revoked_token_loader
,來完成客製化的錯誤訊息。# z5525/models/user.py
class RevokedToken(db.Model):
__tablename__ = 'revoked_tokens'
id = db.Column(db.Integer, primary_key = True)
jti = db.Column(db.String(36)) # 32 unique identifier of the token
def add(self):
db.session.add(self)
db.session.commit()
@classmethod
def is_jti_blacklisted(cls, jti):
#
query = cls.query.filter_by(jti = jti).first()
return bool(query)
# z5525/handlers/user.py
# 省略
from flask_jwt_extended import (
JWTManager, create_access_token,
jwt_required, get_jwt_identity, get_jwt
)
from z5525.models.user import db, User, RevokedToken
# 省略
class UserLogoutResource(Resource):
@jwt_required()
def post(self):
jti = get_jwt()['jti']
# print(jti) # f3480d0a-2d9c-403f-ae88-a6f41e5afe9f
try:
revoked_token = RevokedToken(jti = jti)
revoked_token.add()
result = {
"status": "ok"
"success": True,
}
return result, 201
except Exception as e:
error = {
"error": "INTERNAL_ERROR",
"message": str(e)}
return error, 500
把 jwt 相關的程式碼統一放在 jwt_flask.py ,現在只加入 @jwt.token_in_blocklist_loader 和 @jwt.revoked_token_loader 兩個 callback ,來處理登出時候的相關流程。
# z5525/app/jwt_flask.py
# 省略
from z5525.models.user import db, User, RevokedToken
def load_callback(jwt):
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header: dict, jwt_payload: dict) -> bool:
jti = jwt_payload["jti"]
# print(jwt_header)
# {'alg': 'HS256', 'typ': 'JWT'}
# print(jwt_payload)
# {'fresh': False, 'iat': 1749197206, 'jti': '49cfad8d-24f6-4250-b0bb-70feebc76946', 'type': 'access', 'sub': '{"id": 1, "username": "Test user", "is_admin": false}', 'nbf': 1749197206, 'csrf': '023f6e02-37f4-4420-8e0b-fba0110ab0c0', 'exp': 1749200806}
return models.RevokedToken.is_jti_blacklisted(jti)
@jwt.revoked_token_loader
def handle_revoked_token_error(jwt_header: dict, jwt_payload: dict):
# print(jwt_header)
# print(jwt_payload)
return jsonify(error="Token has been revoked"), HTTPStatus.UNAUTHORIZED
回顧一下當沒有授權時去讀取受保護的數據的時候三種情況 curl http://127.0.0.1:5525/api/protected/users
# 沒有 raise error
{"message": "Internal Server Error"}
# 內定回覆訊息
{"msg": "Missing Authorization Header"}
# 客製化回覆訊息
{"error": "Permission Denied"}
def test_api(client):
# 1.新增使用者 CREATE USER
res1 = client.post(f"{APP_PREFIX}/users", json={
"username": "Test user",
"email": "[email protected]",
"password": "pass",
})
assert res1.status_code == 201
assert "status" in res1.json
assert "ok" == res1.json["status"]
# 2.登入 LOGIN API
res2 = client.post(f"{APP_PREFIX}/login", json={
"email": "[email protected]",
"password": "pass",
})
assert res2.status_code == 200
assert "status" in res2.json
assert "access_token" in res2.json
assert "ok" == res2.json["status"]
access_token = res2.json["access_token"]
# 3.取得保護資料 ACCESS Protected data
headers = {
"Authorization": "Bearer {}".format(access_token)
}
# print(headers)
response = client.get(f"{APP_PREFIX}/protected/users", headers=headers)
assert response.status_code == 200
assert "status" in response.json
assert "items" in response.json
assert "ok" == response.json["status"]
items = response.json["items"]
assert len(items) == 1
user = items[0]
assert "[email protected]" == user["email"]
assert sample_user.email == user["email"]
# 4.登出 LOGOUT API
response = client.post(f"{APP_PREFIX}/logout", headers=headers)
assert response.status_code == 201
assert "status" in response.json
assert "ok" == response.json["status"]
# 5.再一次取得保護資料 ACCESS Protected data again
response = client.get(f"{APP_PREFIX}/protected/users", headers=headers)
assert response.status_code == 401
assert "Token has been revoked" == response.json["error"]
response = client.get("/protected/users")
assert response.status_code == 401
assert response.get_json() == {"error": "unauthorized"}
python 3.11
flask 3.0.3
Flask-Limiter 3.10.0
Flask-JWT-Extended 4.7.1