欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于redis樂(lè)觀鎖實(shí)現(xiàn)并發(fā)排隊(duì)

 更新時(shí)間:2022年12月25日 09:35:24   作者:野生大蝦  
這篇文章主要介紹了基于redis樂(lè)觀鎖實(shí)現(xiàn)并發(fā)排隊(duì)的相關(guān)資料,需要的朋友可以參考下

有個(gè)需求場(chǎng)景是這樣的,使用redis控制scrapy運(yùn)行的數(shù)量。當(dāng)系統(tǒng)的后臺(tái)設(shè)置為4時(shí),只允許scapry啟動(dòng)4個(gè)任務(wù),多余的任務(wù)則進(jìn)行排隊(duì)。

概況

最近做了一個(gè)django + scrapy + celery + redis 的爬蟲(chóng)系統(tǒng),客戶(hù)購(gòu)買(mǎi)的主機(jī)除了跑其他程序外,還要跑我開(kāi)發(fā)的這套程序,所以需要手動(dòng)控制scrapy的實(shí)例數(shù)量,避免過(guò)多的爬蟲(chóng)給系統(tǒng)造成負(fù)擔(dān)。

流程設(shè)計(jì)

1、爬蟲(chóng)任務(wù)由用戶(hù)以請(qǐng)求的方式發(fā)起,所有的用戶(hù)的請(qǐng)求統(tǒng)一進(jìn)入到celery進(jìn)行排隊(duì);
2、任務(wù)數(shù)量控制的執(zhí)行就交給reids,經(jīng)由celery保存到redis,包含了爬蟲(chóng)啟動(dòng)所需要的必要信息,從redis取一條信息即可啟動(dòng)一個(gè)爬蟲(chóng);
3、通過(guò)scrapyd的接口來(lái)獲取當(dāng)前在運(yùn)行的爬蟲(chóng)數(shù)量,以便決定下一步流程:如果小于4,則從redis中取相應(yīng)數(shù)量的信息來(lái)啟動(dòng)爬蟲(chóng),如果大于等于4,則繼續(xù)等待;
4、如果在運(yùn)行爬蟲(chóng)的數(shù)量有所減少,則及時(shí)從reids中取相應(yīng)數(shù)量的信息來(lái)啟動(dòng)爬蟲(chóng)。

代碼實(shí)現(xiàn)

業(yè)務(wù)代碼有點(diǎn)復(fù)雜和啰嗦,此處使用偽代碼來(lái)演示

import redis

# 實(shí)例化一個(gè)redis連接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')

r = redis.Redis(connection_pool=pool)
# 爬蟲(chóng)實(shí)例限制為4 即只允許4個(gè)scrapy實(shí)例在運(yùn)行
limited = 4

# 聲明redis的樂(lè)觀鎖
lock = r.Lock()

# lock.acquire中有while循環(huán),即它會(huì)線程阻塞,直到當(dāng)前線程獲得redis的lock,才會(huì)繼續(xù)往下執(zhí)行代碼
if lock.acquire():
	# 1、從reids中取一條爬蟲(chóng)信息
	info = redis.get() 
	
	# 2、while循環(huán)監(jiān)聽(tīng)爬蟲(chóng)運(yùn)行的數(shù)量
	while True:
		req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
		# 統(tǒng)計(jì)當(dāng)前有多少個(gè)爬蟲(chóng)在運(yùn)行
		running = req.get('running') + req.get('pending')
		
		# 3、判斷是否等待還是要增加爬蟲(chóng)數(shù)量
		# 3.1 如果在運(yùn)行的數(shù)量大于等于設(shè)置到量 則繼續(xù)等待
		if running >= limited:
			continue
		
		# 3.2 如果小于 則啟動(dòng)爬蟲(chóng)
		start_scrapy(info)
		# 3.3 將info從redis中刪除
		redis.delete(info)
		# 3.4 釋放鎖
		lock.release()
		break
		

當(dāng)前,這只是偽代碼而已,實(shí)際的業(yè)務(wù)邏輯可能是非常復(fù)雜的,如:

@shared_task
def scrapy_control(key_uuid):

    r = redis.Redis(connection_pool=pool)
    db = MysqlDB()
    speed_limited = db.fetch_config('REPTILE_SPEED')
    speed_limited = int(speed_limited[0])

    keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')
    keywords_num = int(keywords_num[0])


    # while True:
    lock = r.lock('lock')
    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 進(jìn)入處理環(huán)節(jié)' +  '\n')
    try:
        # acquire默認(rèn)阻塞 如果獲取不到鎖時(shí) 會(huì)一直阻塞在這個(gè)函數(shù)的while循環(huán)中
        if lock.acquire():
            with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖' +  '\n')
            # 1 從redis中獲取信息
            redis_obj = json.loads(r.get(key_uuid))
            user_id = redis_obj.get('user_id')
            contents = redis_obj.get('contents')
            
            # 2 使用while循環(huán)處理核心邏輯          
            is_hold_print = True
            while True:
                req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
                running = req.get('running') + req.get('pending')
                # 3 如果仍然有足夠的爬蟲(chóng)在運(yùn)行 則hold住redis鎖,等待有空余的爬蟲(chóng)位置讓出
                if running >= speed_limited:
                    if is_hold_print:
                        with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬蟲(chóng)在運(yùn)行,線程等待中' +  '\n')
                        is_hold_print = False
                    time.sleep(1)
                    continue
                
                # 4 有空余的爬蟲(chóng)位置 則往下走
                # 4.1 處理完所有的內(nèi)容后 釋放鎖
                if len(contents) == 0:
                    r.delete(key_uuid)
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任務(wù)已完成,從redis中刪除' +  '\n')
                    lock.release()
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 釋放鎖' +  '\n')
                    break

                # 4.2 創(chuàng)建task任務(wù)
                task_uuid = str(uuid.uuid4())
                article_obj = contents.pop()
                article_id = article_obj.get('article_id')
                article = article_obj.get('content')
                try:
                    Task.objects.create(
                        task_uuid = task_uuid,
                        user_id = user_id,
                        article_id = article_id,
                        content = article
                    )
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 創(chuàng)建Task出錯(cuò): ' + str(e) +  '\n')
                # finally:
                # 4.3 啟動(dòng)爬蟲(chóng)任務(wù) 即便創(chuàng)建task失敗也會(huì)啟動(dòng)
                try:
                    task_chain(user_id, article, task_uuid, keywords_num)
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 啟動(dòng)任務(wù)鏈?zhǔn)? ' + str(e) +  '\n')
                
                # 加入sleep 防止代碼執(zhí)行速度快于爬蟲(chóng)啟動(dòng)速度而導(dǎo)致當(dāng)前線程啟動(dòng)額外的爬蟲(chóng)
                time.sleep(5)

    except Exception as e:
        with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖之后的操作出錯(cuò): ' + str(e) +  '\n')
        lock.release()

小坑
scrapy啟動(dòng)速度相對(duì)較慢,所以while循環(huán)中,代碼中執(zhí)行到了爬蟲(chóng)的啟動(dòng),需要sleep一下再去通過(guò)scrapyd接口獲取爬蟲(chóng)運(yùn)行的數(shù)量,如果立刻讀取,可能會(huì)造成誤判。

到此這篇關(guān)于基于redis樂(lè)觀鎖實(shí)現(xiàn)并發(fā)排隊(duì)的文章就介紹到這了,更多相關(guān)基于redis樂(lè)觀鎖實(shí)現(xiàn)并發(fā)排隊(duì)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis中Redisson紅鎖(Redlock)使用原理

    Redis中Redisson紅鎖(Redlock)使用原理

    本文主要介紹了Redis中Redisson紅鎖(Redlock)使用原理,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-08-08
  • 在redhat6.4安裝redis集群【教程】

    在redhat6.4安裝redis集群【教程】

    這篇文章主要介紹了在redhat6.4安裝redis集群【教程】,需要的朋友可以參考下
    2016-05-05
  • Redis安裝與使用方法小結(jié)

    Redis安裝與使用方法小結(jié)

    這篇文章主要介紹了Redis安裝與使用方法,結(jié)合實(shí)例形式分析了Redis數(shù)據(jù)庫(kù)的下載、安裝、啟動(dòng)、設(shè)置及相關(guān)使用操作注意事項(xiàng),需要的朋友可以參考下
    2018-04-04
  • 詳解Redis中數(shù)值亂碼的根本原因以及解決方式

    詳解Redis中數(shù)值亂碼的根本原因以及解決方式

    這篇文章給大家詳細(xì)分析了Redis中數(shù)值亂碼的根本原因以及解決方式,通過(guò)代碼示例給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2024-02-02
  • 談?wù)凴edis分布式鎖的正確實(shí)現(xiàn)方法

    談?wù)凴edis分布式鎖的正確實(shí)現(xiàn)方法

    這篇文章主要給大家介紹了關(guān)于Redis分布式鎖的正確實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Redis中Scan命令的基本使用教程

    Redis中Scan命令的基本使用教程

    這篇文章主要給大家介紹了關(guān)于Redis中Scan命令的基本使用教程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • Redis刪除策略的三種方法及逐出算法

    Redis刪除策略的三種方法及逐出算法

    這篇文章主要介紹了Redis刪除策略的三種方法及逐出算法,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-08-08
  • redis?zset實(shí)現(xiàn)滑動(dòng)窗口限流的代碼

    redis?zset實(shí)現(xiàn)滑動(dòng)窗口限流的代碼

    這篇文章主要介紹了redis?zset實(shí)現(xiàn)滑動(dòng)窗口限流,滑動(dòng)窗口算法思想就是記錄一個(gè)滑動(dòng)的時(shí)間窗口內(nèi)的操作次數(shù),操作次數(shù)超過(guò)閾值則進(jìn)行限流,本文通過(guò)實(shí)例代碼給大家詳細(xì)介紹,需要的朋友參考下吧
    2022-03-03
  • Django使用Redis進(jìn)行緩存詳細(xì)步驟

    Django使用Redis進(jìn)行緩存詳細(xì)步驟

    這篇文章主要介紹了Django使用Redis進(jìn)行緩存詳細(xì)流程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-08-08
  • redis?手機(jī)驗(yàn)證碼實(shí)現(xiàn)示例

    redis?手機(jī)驗(yàn)證碼實(shí)現(xiàn)示例

    本文主要介紹了redis?手機(jī)驗(yàn)證碼實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11

最新評(píng)論