Python實現(xiàn)常見限流算法的示例代碼
前提
本地的redis
服務(wù)已經(jīng)啟動,mac
用戶兩行命令即可:
brew install redis && brew services start redis
完了之后,在代碼里寫上獲得redis
連接的代碼:
def get_redis_con(): pool = redis.ConnectionPool(max_connections=4, decode_responses=True) return redis.Redis(connection_pool=pool)
其他配置參照官方文檔。
固定窗口
類似于把時間切分了,每個時間段只允許固定次的請求。
最直白的話語就是:我的接口1s只允許100次請求,多了我就拋異常。
def fixed_window(user, action, time_zone=60, max_times=30): key = f'{action}' count = get_redis_con().get(key) if not count: count = 1 get_redis_con().setex(key, time_zone, count) if int(count) < max_times: get_redis_con().incr(key) return True return False
代碼中加上了user
,其實就是避免單個用戶的接口防刷。在之前的文章<如何優(yōu)雅的實現(xiàn)接口防刷>中,其實就是用的這種方法。
對應(yīng)的話,其實也是有一些問題的。
最主要的一個缺點就是:流量不是平滑的,可能存在多個流量峰值導(dǎo)致服務(wù)間歇性的不可用。最直觀的感受是在窗口切換的時候,流量堆積導(dǎo)致的問題。
滑動窗口
描述的原理是:
- 將時間劃分為細(xì)粒度的區(qū)間,每個區(qū)間維持一個計數(shù)器,每進入一個請求則將計數(shù)器加一;
- 多個區(qū)間組成一個時間窗口,每流逝一個區(qū)間時間后,則拋棄最老的一個區(qū)間,納入新區(qū)間;
- 若當(dāng)前窗口的區(qū)間計數(shù)器總和超過設(shè)定的限制數(shù)量,則本窗口內(nèi)的后續(xù)請求都被丟棄。
可能還是有一些抽象,我們借用代碼來講解:
def silde_window(user, action, time_zone=60, max_times=30): key = f'{action}' now_ts = time.time() * 1000 # ms級時間戳,保證唯一 value = now_ts # 時間窗口的左邊界 old_ts = now_ts - time_zone * 1000 # 記錄 {成員元素:分?jǐn)?shù)值} mapping = { value: now_ts, } get_redis_con().zadd(key, mapping) # 刪除時間窗口之前的數(shù)據(jù) get_redis_con().zremrangebyscore(key, -1, old_ts) # 獲得窗口內(nèi)的行為數(shù)量 count = get_redis_con().zcard(key) get_redis_con().expire(key, time_zone + 1) if not count or int(count) < max_times: return True return False
用到的數(shù)據(jù)結(jié)構(gòu)是zset
。這里的時間戳就是對應(yīng)值的score
。
這種方式可以應(yīng)對流量的激增,但是流量的曲線還是不夠平滑。
漏桶算法
就類似于一個桶,請求先去填滿桶,填滿之后,其它的請求直接拒絕;同時,桶以一定的速率漏出,放行請求。
這種算法的速率是不支持動態(tài)調(diào)整的,對于系統(tǒng)資源的充分利用上還是存在問題的。
令牌桶算法
漏桶算法的主角是桶,令牌桶的主角是令牌。
def pass_token_bucket(user, action, time_zone=60, max_times=30): key = f'{user}:{action}' # 令牌生成速度 rate = max_times / time_zone capacity = max_times token_count = get_redis_con().hget(key, 'tokens') last_time = get_redis_con().hget(key, 'last_time') now = time.time() token_count = int(token_count) if token_count else capacity last_time = last_time if last_time else now # 經(jīng)過一段時間之后生成的令牌數(shù)量 new_token_count = int((now - last_time) * rate) if new_token_count > 1: token_count += new_token_count if token_count > capacity: token_count = capacity last_time = time.time() get_redis_con().hset(key, 'last_time', last_time) ? if token_count >= 1: token_count -= 1 get_redis_con().hset(key, 'tokens', token_count) return True return False
對于漏桶和令牌桶,算法的實現(xiàn)其實都大差不差。shigen
在學(xué)習(xí)這個的時候,還有一點整混淆了。
最后,說一下如何驗證,使用到了python
的多線程。
executor = ThreadPoolExecutor(max_workers=4) APIS = ['/api/a', '/get/user/1', '/get/user/2', '/get/user/3'] ? ? def task() -> bool: user = random.randint(1000, 1010) status = pass_token_bucket(user, random.choice(APIS)) if not status: raise SystemError('{}被限制'.format(user)) return status if __name__ == '__main__': jobs = [executor.submit(task) for _ in range(1000)] for job in jobs: print(job.result())
到此這篇關(guān)于Python實現(xiàn)常見限流算法的示例代碼的文章就介紹到這了,更多相關(guān)Python限流算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python自動檢測requests所獲得html文檔的編碼
這篇文章主要為大家詳細(xì)介紹了如何通過Python自動檢測requests實現(xiàn)獲得html文檔的編碼,文中的示例代碼講解詳細(xì),感興趣的可以了解下2024-11-11Python?matplotlib.pyplot.hist()繪制直方圖的方法實例
直方圖(Histogram)又稱質(zhì)量分布圖,是一種統(tǒng)計報告圖,由一系列高度不等的縱向條紋或線段表示數(shù)據(jù)分布的情況,一般用橫軸表示數(shù)據(jù)類型,縱軸表示分布情況,下面這篇文章主要給大家介紹了關(guān)于Python?matplotlib.pyplot.hist()繪制直方圖的相關(guān)資料,需要的朋友可以參考下2022-06-06Windows系統(tǒng)下使用flup搭建Nginx和Python環(huán)境的方法
這篇文章主要介紹了Windows系統(tǒng)下使用flup搭建Nginx和Python環(huán)境的方法,文中使用到了flup這個Python的FastCGI工具,需要的朋友可以參考下2015-12-12Python?threading和Thread模塊及線程的實現(xiàn)
這篇文章主要介紹了Python?threading和Thread模塊及線程的實現(xiàn),Python通過兩個標(biāo)準(zhǔn)庫thread和threading提供對線程的支持,threading對thread進行了封裝,具體實現(xiàn)介紹需要的朋友可以參考一下下面文章內(nèi)容2022-06-06