用python 批量操作redis數(shù)據(jù)庫
方法一:使用 pipeline
使用pipelining 發(fā)送命令時,redis server必須部分請求放到隊列中(使用內(nèi)存)執(zhí)行完畢后一次性發(fā)送結果,在 pipeline 使用期間,將“獨占”鏈接,無法進行非“管道”類型的其他操作,直至 pipeline 關閉;如果 pipeline 的指令集很多很龐大,為了不影響其他操作(redis 最大時間lua-time-limit默認是5s),可以使用其他新建新鏈接操作。批量操作如下:
import redis
r = redis.Redis(host='127.0.0.1', port=6379, password='1234567890')
with r.pipeline() as ctx:
a = time.time()
ctx.hset('current', "time2", a)
ctx.hset('current', "time3", a)
res = ctx.execute()
print("result: ", res)
使用 pipe line 以樂觀鎖的形式執(zhí)行事務操作
# -*- coding:utf-8 -*-
import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor
r = redis.Redis(host='127.0.0.1', port=6379)
# 減庫存函數(shù), 循環(huán)直到減庫存完成
# 庫存充足, 減庫存成功, 返回True
# 庫存不足, 減庫存失敗, 返回False
def decr_stock():
# python中redis事務是通過pipeline的封裝實現(xiàn)的
with r.pipeline() as pipe:
while True:
try:
# watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常
pipe.watch('stock:count')
count = int(pipe.get('stock:count'))
if count > 0: # 有庫存
# 事務開始
pipe.multi() # multi 判斷 watch 監(jiān)控的 key 是否被其他客戶端改變
pipe.decr('stock:count')
# 把命令推送過去
# execute返回命令執(zhí)行結果列表, 這里只有一個decr返回當前值
result = pipe.execute()[0]
print("result: ", result)
return True
else:
return False
except WatchError as e:
# 打印WatchError異常, 觀察被watch鎖住的情況
print(e.args)
finally:
pipe.unwatch()
def worker():
while True:
# 沒有庫存就退出
if not decr_stock():
break
# 實驗開始
# 設置庫存為100
r.set("stock:count", 100)
# 多進程模擬多個客戶端提交
with ProcessPoolExecutor(max_workers=2) as pool:
for _ in range(10):
pool.submit(worker)
方法二:使用 register_script
分布執(zhí)行,發(fā)送腳本到redis服務器,獲取一個本次連接的一個調(diào)用句柄,根據(jù)此句柄可以無數(shù)次執(zhí)行不同參數(shù)調(diào)用
import redis
import time
r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
lua = """
local key = KEYS[1]
local field = ARGV[1]
local timestamp_new = ARGV[2]
-- get timestamp of the key in redis
local timestamp_old = redis.call('hget', key, field)
-- if timestamp_old == nil, it means the key is not exist
if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
redis.call('hset', key, field .. 1, timestamp_new)
-- timestamp_new > timestamp_old
return redis.pcall('hset', key, field, timestamp_new)
end
"""
cmd = r.register_script(lua)
cur_time = time.time()
cmd(keys=['current'], args=["time", cur_time])
register_script 調(diào)用 lua 來實現(xiàn),需要注意 redis.call(method, key, field) 的返回值(nil,false,1),此處沒有鍵值返回的是false。如果中間有錯誤,所有的語句不時不生效。
方法三:使用 script_load 和 evalsha
簡而言之,通過 script_load 發(fā)送給redis服務器,使加載 lua 腳本,并常駐內(nèi)存,返回標志,通過 evalsha 按標志進行執(zhí)行,此連接脫離本次redis 客戶端。
import redis
import time
r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
lua = """
local key = KEYS[1]
local field = ARGV[1]
local timestamp_new = ARGV[2]
-- get timestamp of the key in redis
local timestamp_old = redis.call('hget', key, field)
-- if timestamp_old == nil, it means the key is not exist
if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
redis.call('hset', key, field .. 1, timestamp_new)
-- timestamp_new > timestamp_old
return redis.pcall('hset', key, field, timestamp_new)
end
"""
sha = r.script_load(lua)
print(r.evalsha(sha, 1, 'current', 'time', time.time()))
Redis 管理Lua腳本:(Python下為 script_... )
- script load
此命令用于將Lua腳本加載到Redis內(nèi)存中
- script exists
scripts exists sha1 [sha1 …]
此命令用于判斷sha1是否已經(jīng)加載到Redis內(nèi)存中
- script flush
此命令用于清除Redis內(nèi)存已經(jīng)加載的所有Lua腳本,在執(zhí)行script flush后,所有 sha 不復存在。
- script kill
此命令用于殺掉正在執(zhí)行的Lua腳本。
方法四:eval
使用方法與方法三類似,但是eval是一次性請求,每次請求,必須攜帶 lua 腳本
以上就是用python 批量操作redis數(shù)據(jù)庫的詳細內(nèi)容,更多關于python 批量操作redis數(shù)據(jù)庫的資料請關注腳本之家其它相關文章!
相關文章
python使用wmi模塊獲取windows下的系統(tǒng)信息 監(jiān)控系統(tǒng)
Python用WMI模塊獲取Windows系統(tǒng)的硬件信息:硬盤分區(qū)、使用情況,內(nèi)存大小,CPU型號,當前運行的進程,自啟動程序及位置,系統(tǒng)的版本等信息。2015-10-10
Python使用異步線程池如何實現(xiàn)異步TCP服務器交互
這篇文章主要介紹了Python使用異步線程池如何實現(xiàn)異步TCP服務器交互問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11
django項目環(huán)境搭建及在虛擬機本地創(chuàng)建django項目的教程
這篇文章主要介紹了django項目環(huán)境搭建及在虛擬機本地創(chuàng)建django項目的教程,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-08-08

