欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python3自動(dòng)更新緩存類(lèi)的具體使用

 更新時(shí)間:2025年01月08日 10:30:11   作者:言之。  
本文介紹了使用一個(gè)自動(dòng)更新緩存的Python類(lèi)AutoUpdatingCache,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

這個(gè)類(lèi)會(huì)在后臺(tái)自動(dòng)更新緩存數(shù)據(jù),你只需要調(diào)用方法來(lái)獲取數(shù)據(jù)即可。

自動(dòng)更新緩存類(lèi)

以下是 AutoUpdatingCache 類(lèi)的實(shí)現(xiàn):

import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
        """
        初始化緩存類(lèi)。

        :param update_function: 一個(gè)函數(shù),用于生成或更新緩存數(shù)據(jù)。
        :param expiry_time: 緩存的更新周期(秒)。
        """
        self.update_function = update_function
        self.expiry_time = expiry_time
        self.cache_data = None
        self.last_updated = 0
        self.lock = threading.Lock()
        self._start_background_update()

    def _start_background_update(self):
        # 啟動(dòng)后臺(tái)線程更新緩存
        self.update_thread = threading.Thread(target=self._update_cache_periodically)
        self.update_thread.daemon = True
        self.update_thread.start()

    def _update_cache_periodically(self):
        while True:
            current_time = time.time()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            time.sleep(1)  # 每秒檢查一次

    def _update_cache(self):
        with self.lock:
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = time.time()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
        with self.lock:
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."

使用說(shuō)明

定義一個(gè)數(shù)據(jù)生成函數(shù)

首先,需要定義一個(gè)用于生成或更新緩存數(shù)據(jù)的函數(shù)。這個(gè)函數(shù)可以是任何耗時(shí)的操作,例如從數(shù)據(jù)庫(kù)查詢(xún)、計(jì)算復(fù)雜結(jié)果等。

import time

def generate_cache_data():
    # 模擬耗時(shí)操作
    time.sleep(5)
    return {"value": "fresh data", "timestamp": time.time()}

創(chuàng)建緩存類(lèi)的實(shí)例

將數(shù)據(jù)生成函數(shù)傳遞給 AutoUpdatingCache 類(lèi),并設(shè)置緩存更新周期。

cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

獲取緩存數(shù)據(jù)

在需要的地方調(diào)用 get_data() 方法即可獲取緩存數(shù)據(jù)。

data = cache.get_data()
print(data)

完整示例

將以上步驟組合起來(lái):

import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
        self.update_function = update_function
        self.expiry_time = expiry_time
        self.cache_data = None
        self.last_updated = 0
        self.lock = threading.Lock()
        self._start_background_update()

    def _start_background_update(self):
        self.update_thread = threading.Thread(target=self._update_cache_periodically)
        self.update_thread.daemon = True
        self.update_thread.start()

    def _update_cache_periodically(self):
        while True:
            current_time = time.time()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            time.sleep(1)

    def _update_cache(self):
        with self.lock:
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = time.time()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
        with self.lock:
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."

# 數(shù)據(jù)生成函數(shù)
def generate_cache_data():
    time.sleep(5)  # 模擬耗時(shí)操作
    return {"value": "fresh data", "timestamp": time.time()}

# 創(chuàng)建緩存實(shí)例
cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

# 模擬獲取數(shù)據(jù)
for _ in range(10):
    data = cache.get_data()
    print(data)
    time.sleep(10)

代碼解釋

  • AutoUpdatingCache 類(lèi)

    • init 方法:
      • 初始化緩存,設(shè)置數(shù)據(jù)生成函數(shù)和緩存更新周期。
      • 啟動(dòng)后臺(tái)線程 _update_cache_periodically。
    • _update_cache_periodically 方法:
      • 無(wú)限循環(huán),每隔一秒檢查緩存是否需要更新。
      • 如果當(dāng)前時(shí)間距離上次更新時(shí)間超過(guò)了 expiry_time,則調(diào)用 _update_cache。
    • _update_cache 方法:
      • 使用 update_function 更新緩存數(shù)據(jù)。
      • 使用鎖機(jī)制 threading.Lock 確保線程安全。
    • get_data 方法:
      • 獲取緩存數(shù)據(jù)。
      • 如果緩存數(shù)據(jù)為空(初始化中),返回提示信息。
  • 數(shù)據(jù)生成函數(shù)

    • generate_cache_data 函數(shù)模擬一個(gè)耗時(shí)操作,生成新的緩存數(shù)據(jù)。
  • 使用示例

    • 創(chuàng)建緩存實(shí)例并在循環(huán)中每隔 10 秒獲取一次數(shù)據(jù),觀察緩存的更新情況。

注意事項(xiàng)

  • 線程安全:

    • 使用 threading.Lock 確保在多線程環(huán)境下數(shù)據(jù)訪問(wèn)的安全性。
  • 異常處理:

    • 在更新緩存時(shí),捕獲可能的異常,防止線程崩潰。
  • 后臺(tái)線程:

    • 將線程設(shè)置為守護(hù)線程(daemon=True),使得主程序退出時(shí),線程自動(dòng)結(jié)束。

應(yīng)用場(chǎng)景

你可以將這個(gè)緩存類(lèi)應(yīng)用在 Web 應(yīng)用程序中,例如在 Sanic 的路由中:

from sanic import Sanic
from sanic.response import json

app = Sanic("CacheApp")

@app.route("/data")
async def get_cached_data(request):
    data = cache.get_data()
    return json({"data": data})

if __name__ == "__main__":
    # 確保緩存在應(yīng)用啟動(dòng)前初始化
    cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
    app.run(host="0.0.0.0", port=8000)

這樣,用戶(hù)在訪問(wèn) /data 路由時(shí),總是能得到緩存中的數(shù)據(jù),而緩存會(huì)在后臺(tái)自動(dòng)更新,不會(huì)因?yàn)楦戮彺娑鴮?dǎo)致請(qǐng)求超時(shí)。

到此這篇關(guān)于python3自動(dòng)更新緩存類(lèi)的具體使用的文章就介紹到這了,更多相關(guān)python3自動(dòng)更新緩存類(lèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python Celery多隊(duì)列配置代碼實(shí)例

    Python Celery多隊(duì)列配置代碼實(shí)例

    這篇文章主要介紹了Python Celery多隊(duì)列配置代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Python圖像處理庫(kù)Pillow的簡(jiǎn)單實(shí)現(xiàn)

    Python圖像處理庫(kù)Pillow的簡(jiǎn)單實(shí)現(xiàn)

    本文主要介紹了Python圖像處理庫(kù)Pillow的簡(jiǎn)單實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Python守護(hù)進(jìn)程實(shí)現(xiàn)過(guò)程詳解

    Python守護(hù)進(jìn)程實(shí)現(xiàn)過(guò)程詳解

    這篇文章主要介紹了Python守護(hù)進(jìn)程實(shí)現(xiàn)過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • PyCharm2020.1.2社區(qū)版安裝,配置及使用教程詳解(Windows)

    PyCharm2020.1.2社區(qū)版安裝,配置及使用教程詳解(Windows)

    這篇文章主要介紹了PyCharm2020.1.2社區(qū)版安裝,配置及使用教程(Windows),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-08-08
  • Pytorch自定義CNN網(wǎng)絡(luò)實(shí)現(xiàn)貓狗分類(lèi)詳解過(guò)程

    Pytorch自定義CNN網(wǎng)絡(luò)實(shí)現(xiàn)貓狗分類(lèi)詳解過(guò)程

    PyTorch是一個(gè)開(kāi)源的Python機(jī)器學(xué)習(xí)庫(kù),基于Torch,用于自然語(yǔ)言處理等應(yīng)用程序。它不僅能夠?qū)崿F(xiàn)強(qiáng)大的GPU加速,同時(shí)還支持動(dòng)態(tài)神經(jīng)網(wǎng)絡(luò)。本文將介紹PyTorch自定義CNN網(wǎng)絡(luò)實(shí)現(xiàn)貓狗分類(lèi),感興趣的可以學(xué)習(xí)一下
    2022-12-12
  • Python 詞典(Dict) 加載與保存示例

    Python 詞典(Dict) 加載與保存示例

    今天小編就為大家分享一篇Python 詞典(Dict) 加載與保存示例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-12-12
  • 4種Python基于字段的不使用元類(lèi)的ORM實(shí)現(xiàn)方法總結(jié)

    4種Python基于字段的不使用元類(lèi)的ORM實(shí)現(xiàn)方法總結(jié)

    在 Python 中,ORM(Object-Relational Mapping)是一種將對(duì)象和數(shù)據(jù)庫(kù)之間的映射關(guān)系進(jìn)行轉(zhuǎn)換的技術(shù),本文為大家整理了4種不使用元類(lèi)的簡(jiǎn)單ORM實(shí)現(xiàn)方式,需要的可以參考下
    2023-12-12
  • python多進(jìn)程讀圖提取特征存npy

    python多進(jìn)程讀圖提取特征存npy

    這篇文章主要為大家詳細(xì)介紹了python多進(jìn)程讀圖提取特征存npy,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-05-05
  • python+selenium 鼠標(biāo)事件操作方法

    python+selenium 鼠標(biāo)事件操作方法

    今天小編就為大家分享一篇python+selenium 鼠標(biāo)事件操作方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-08-08
  • python使用selenium登錄QQ郵箱(附帶滑動(dòng)解鎖)

    python使用selenium登錄QQ郵箱(附帶滑動(dòng)解鎖)

    這篇文章主要為大家詳細(xì)介紹了python使用selenium登錄QQ郵箱,帶滑動(dòng)解鎖登錄功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-01-01

最新評(píng)論