Python WXPY實(shí)現(xiàn)微信監(jiān)控報(bào)警功能的代碼
概述:
本文主要分享一下博主在學(xué)習(xí)wxpy 的過(guò)程中開發(fā)的一個(gè)小程序。博主在最近有一個(gè)監(jiān)控報(bào)警的需求需要完成,然后剛好在學(xué)習(xí)wxpy 這個(gè)東西,因此很巧妙的將工作和學(xué)習(xí)聯(lián)系在一起。
博文中主要使用到的技術(shù)設(shè)計(jì)到Python,Redis,以及Java。涉及到的技術(shù)看似很多,但是主要的語(yǔ)言是基于Python進(jìn)行開發(fā)的。
架構(gòu)涉及主要采用了 生產(chǎn)者消費(fèi)者的涉及模式,使用Redis作為消息隊(duì)列進(jìn)行解耦操作。
主要架構(gòu)涉及如下:
接下來(lái)開始介紹一下程序的實(shí)現(xiàn)過(guò)程,主要講解wxpy -> python.redis -> Java.redis
1、Wxpy初體驗(yàn)
項(xiàng)目使用的python 是3.5版本的,因此語(yǔ)法會(huì)和2.x版本有所區(qū)別,wxpy 支持python3.4-3.6 以及python2.7版本 ,因此在python版本上不用太過(guò)于糾結(jié)
1.1 安裝wxpy
在這里默認(rèn)大家以及安裝好了pip,我們需要安裝wxpy 以及wechat_sender 兩個(gè)包,這里推薦使用國(guó)內(nèi)的豆瓣源,如果大家網(wǎng)速過(guò)硬 請(qǐng)忽略。。。。
pip install wxpy -i "https://pypi.doubanio.com/simple/" pip install wechat_sender -i "https://pypi.doubanio.com/simple/"
1.2 wxpy 登陸
wxpy 使用起來(lái)非常簡(jiǎn)單,我們只需要?jiǎng)?chuàng)建一個(gè)bot 對(duì)象,程序運(yùn)行后,會(huì)彈出二維碼,掃描二維碼后顯示登陸成功。
下述代碼在登陸完成后,會(huì)向我們的文件傳輸助手發(fā)送一個(gè)“hello world!”。(每個(gè)程序都需要一個(gè)hello world)
from wxpy import * bot = Bot() bot.file_helper.send('hello world!') print("ending")
關(guān)于Bot()對(duì)象的相關(guān)參數(shù)說(shuō)明,我們可以在源碼中的注釋中看到:
:param cache_path:
* 設(shè)置當(dāng)前會(huì)話的緩存路徑,并開啟緩存功能;為 `None` (默認(rèn)) 則不開啟緩存功能。
* 開啟緩存后可在短時(shí)間內(nèi)避免重復(fù)掃碼,緩存失效時(shí)會(huì)重新要求登陸。
* 設(shè)為 `True` 時(shí),使用默認(rèn)的緩存路徑 'wxpy.pkl'。
:param console_qr:
* 在終端中顯示登陸二維碼,需要安裝 pillow 模塊 (`pip3 install pillow`)。
* 可為整數(shù)(int),表示二維碼單元格的寬度,通常為 2 (當(dāng)被設(shè)為 `True` 時(shí),也將在內(nèi)部當(dāng)作 2)。
* 也可為負(fù)數(shù),表示以反色顯示二維碼,適用于淺底深字的命令行界面。
* 例如: 在大部分 Linux 系統(tǒng)中可設(shè)為 `True` 或 2,而在 macOS Terminal 的默認(rèn)白底配色中,應(yīng)設(shè)為 -2。
:param qr_path: 保存二維碼的路徑
:param qr_callback: 獲得二維碼后的回調(diào),可以用來(lái)定義二維碼的處理方式,接收參數(shù): uuid, status, qrcode
:param login_callback: 登陸成功后的回調(diào),若不指定,將進(jìn)行清屏操作,并刪除二維碼文件
:param logout_callback: 登出時(shí)的回調(diào)
這里介紹一下兩個(gè)主要使用到的參數(shù):
- cache_path: 在開發(fā)過(guò)程中可以設(shè)置為True 避免每次登陸都需要重新掃描,具有緩存的作用。
- qr_path:用于保存二維碼生成圖片,主要解決服務(wù)器圖片展示不方便的問(wèn)題
1.3 wxpy 好友與聊天群
如代碼所示,我們可以通過(guò)Bot.friends 以及Bot.groups 來(lái)獲取到所有的好友以及聊天群,這里需要注意的是,聊天群需要保存到通訊錄中,不然可能會(huì)出現(xiàn)找不到聊天群的情況。
在搜索方法中,可以提供的參數(shù)有:姓名,city,province,sex 等相關(guān)變量。
關(guān)于好友的詳細(xì)API文檔,可以參考---》 微信好友API
from wxpy import * bot = Bot() # 獲取所有好友 friends = bot.friends() # 遍歷輸出好友名稱 for friend in friends: print(friend) # 找到好友 friend = bot.friends.search('被單')[0] print(friend) friend.send("hello world!") # 獲取所有聊天群 groups = bot.groups() for group in groups: print(group) # 找到目標(biāo)群 group = groups.search("409")[0] group.send("hello world!")
1.4 wxpy 消息處理
接下來(lái)主要介紹一下用戶發(fā)送消息的類型,目前wxpy 支持發(fā)送文本,圖片,視頻以及文件。主要的發(fā)送方式如代碼所示:
這里比較重要的就是關(guān)于 @bot.register() 的使用,該注釋主要用于注冊(cè)消息接收器,我們可以根據(jù)特定的需求,配置不一樣的消息接收器。
Bot.register(chats=None, msg_types=None, except_self=True, run_async=True, enabled=True) 詳情可以查看源碼中的介紹
關(guān)于消息處理API,讀者可以在該地址下查看詳細(xì)的配置,這里不做過(guò)多的描述。
代碼中有使用到:embed() 這個(gè)方法, 主要用于阻塞進(jìn)程,避免由于程序運(yùn)行結(jié)束導(dǎo)致無(wú)法接收消息。
from wxpy import * bot = Bot() # 獲取好友 my_friend = bot.friends().search('被單')[0] # 搜索信息 messages = bot.messages.search(keywords='測(cè)試', sender=bot.self) for message in messages: print(message) # 發(fā)送文本 my_friend.send('Hello, WeChat!') # 發(fā)送圖片 my_friend.send_image('my_picture.png') # 發(fā)送視頻 my_friend.send_video('my_video.mov') # 發(fā)送文件 my_friend.send_file('my_file.zip') # 以動(dòng)態(tài)的方式發(fā)送圖片 my_friend.send('@img@my_picture.png') # 發(fā)送公眾號(hào) my_friend.send_raw_msg( # 名片的原始消息類型 raw_type=42, # 注意 `username` 在這里應(yīng)為微信 ID,且被發(fā)送的名片必須為自己的好友 raw_content='<msg username="wxpy_bot" nickname="wxpy 機(jī)器人"/>' ) # 消息接收監(jiān)聽器 @bot.register() def print_others(msg): # 輸出監(jiān)聽到的消息 print(msg) # 回復(fù)消息 msg.reply("hello world") embed()
1.4 wxpy 圖靈機(jī)器人
wxpy 接入圖靈機(jī)器人相當(dāng)方便,我們首先需要到圖靈近期人官網(wǎng)進(jìn)行注冊(cè),哆啦A夢(mèng)的任意門。
通過(guò)注冊(cè)Tuling 對(duì)象,當(dāng)我們接收到消息的時(shí)候,可以直接使用tuling機(jī)器人來(lái)幫我們進(jìn)行答復(fù)。其他的業(yè)務(wù)需求各位可以根據(jù)自己的需求來(lái)完成相應(yīng)的邏輯。
from wxpy import * bot = Bot() # 獲取好友 dear = bot.friends().search('被單')[0] # 注冊(cè)獲得個(gè)人的圖靈機(jī)器人key 填入 tuling = Tuling(api_key='******') # 使用圖靈機(jī)器人自動(dòng)與指定好友聊天 @bot.register(dear) def reply_my_friend(msg): print(msg) tuling.do_reply(msg) embed()
1.5 wechat_sender
在熟悉了wxpy 的相關(guān)操作后,我們接下來(lái)介紹一下一個(gè)主要使用到的工具。由于wxpy 的設(shè)計(jì),導(dǎo)致了一些業(yè)務(wù)操作并不好進(jìn)行實(shí)現(xiàn)。因此我們?cè)谶@里引入一個(gè)工具類:wechat_sender 。
首先我們需要像往常一樣進(jìn)行微信登陸,然后使用 listen() 進(jìn)行對(duì)我們的 bot() 對(duì)象進(jìn)行監(jiān)聽。
在這里我們可以看到了和上面代碼的區(qū)別,這里使用的是listen(),上面是使用embed()進(jìn)行監(jiān)聽。 我們?cè)龠@里使用listen 進(jìn)行監(jiān)聽對(duì)象后,可以設(shè)置相應(yīng)的配置。監(jiān)聽默認(rèn)設(shè)置的接收對(duì)象為self.file_helper,通過(guò)設(shè)置receivers 可以配置消息的接收者。
# login.py from wxpy import * from wechat_sender import * bot = Bot() friend = bot.friends().search('被單')[0] listen(bot, token='test', receivers=[friend])
# sender.py coding: utf-8 from wechat_sender import Sender sender = Sender(token='test') sender.send('hello world!')
在別的python 文件中,我們只需要?jiǎng)?chuàng)建一個(gè)Sender() 對(duì)象,然后調(diào)用Sender.send()方法,即可對(duì)我們?cè)O(shè)定好的消息接收者發(fā)送消息。
Sender()在創(chuàng)建的時(shí)候可以通過(guò)特定的參數(shù)設(shè)定,比如這里使用了 token 用于避免多個(gè)listen 導(dǎo)致sender 混淆。還可以在sender中設(shè)置receiver 從listen 中選取需要接收消息的對(duì)象。
1.6 wxpy 在監(jiān)控模塊的代碼實(shí)現(xiàn)
微信登陸模塊:
from wechat_sender import * from wxpy import * bot = Bot(qr_path="qr.png") group = bot.groups().search('監(jiān)控報(bào)警')[0] print("微信登陸成功!進(jìn)行監(jiān)控報(bào)警功能!") print(group) # listen(bot, token='test', receivers=[group])
業(yè)務(wù)處理模塊:
import redis from wechat_sender import * sender = Sender(token='test', receivers='監(jiān)控報(bào)警') while true: # do anything sender.send(message=data) # do anything p.unsubscribe('cardniu-monitor') print('取消訂閱')
2、Python-Redis
這一模塊我們將簡(jiǎn)單描述一下python 對(duì)于Redis 的支持,首先我們需要安裝python-redis相關(guān)模塊:
2.1 Python-redis安裝
- 下載壓縮包:哆啦A夢(mèng)的任意門
- 解壓進(jìn)入 Redis 目錄
- 命令行執(zhí)行: python setup.py install
2.2 Python 簡(jiǎn)單操作Redis
由于Python 操作Redis 并不是我們這里的主要內(nèi)容,所以這里簡(jiǎn)單的過(guò)一下Python 對(duì)Redis 的支持。
import redis r = redis.Redis(host='ip', port=6379, db=15, password='****') r.set('name', 'Jaycekon') value = r.get('name') print(value)
2.3 Redis的發(fā)布訂閱模式
在為大家講解Redis 的發(fā)布訂閱模式前,先為大家科普一下生產(chǎn)者消費(fèi)者模式:
大家來(lái)領(lǐng)略一下我的靈魂畫圖,生產(chǎn)者消費(fèi)者的核心思想是通過(guò)一個(gè)冰箱來(lái)進(jìn)行解耦,就是我們的廚師不需要出廚房,顧客也不需要去廚房拿飯吃。通過(guò)一個(gè)冰箱來(lái)進(jìn)行中間的解耦合。
下面是我們通過(guò)python 實(shí)現(xiàn)的一個(gè)生產(chǎn)者消費(fèi)者模式,廚師不停的做飯,顧客不停的吃。。大家相互不影響。
from threading import Thread queues = queue.Queue(10) class Producer(Thread): def run(self): while True: elem = random.randrange(9) queues.put(elem) print("廚師 {} 做了 {} 飯 --- 還剩 {} 飯沒(méi)賣完".format(self.name, elem, queues.qsize())) time.sleep(random.random()) class Consumer(Thread): def run(self): while True: elem = queues.get() print("吃貨{} 吃了 {} 飯 --- 還有 {} 飯可以吃".format(self.name, elem, queues.qsize())) time.sleep(random.random()) def main(): for i in range(3): p = Producer() p.start() for i in range(2): c = Consumer() c.start() if __name__ == '__main__': main()
再來(lái)說(shuō)一下為什么使用到Redis 的發(fā)布訂閱模式。
Redis在當(dāng)前程序中,主要擔(dān)當(dāng)了一個(gè)消息隊(duì)列的角色,我們并沒(méi)有使用目前較為熱門的RabbitMq,ActiveMq來(lái)消息隊(duì)列進(jìn)行解耦。主要原因在于我們的服務(wù)不大,消息量也比較小,因此在不影響程序的架構(gòu)基礎(chǔ)上,采用了Redis 作為消息隊(duì)列。
消息隊(duì)列的關(guān)鍵點(diǎn)在于,當(dāng)生產(chǎn)者發(fā)布消息后,要確保消費(fèi)者能夠快速的接收消息。發(fā)布訂閱模式能夠很好的幫我們解決,當(dāng)有消息到達(dá)的時(shí)候,程序馬上能夠做出響應(yīng)操作。
Redis消息發(fā)布:
import redis pool = redis.ConnectionPool(host='ip', port=6379, db=4, password='****') r = redis.StrictRedis(connection_pool=pool) while True: inputs = input("publish:") r.publish('spub', inputs) if inputs == 'over': print('停止發(fā)布') break
Redis消息訂閱:
import redis pool = redis.ConnectionPool(host='ip', port=6379, db=4, password='***') r = redis.StrictRedis(connection_pool=pool) p = r.pubsub() p.subscribe('cardniu-monitor') for item in p.listen(): print(item) if item['type'] == 'message': data = item['data'] print("消息隊(duì)列中接收到信息:", data)if item['data'] == 'over': break p.unsubscribe('cardniu-monitor') print('取消訂閱')
2.4 wxpy+Redis 實(shí)現(xiàn)監(jiān)控系統(tǒng)的消費(fèi)者
最終,在python 這邊實(shí)現(xiàn)的監(jiān)控系統(tǒng)消費(fèi)者如下:
微信登陸模塊:
from wechat_sender import * from wxpy import * bot = Bot(qr_path="qr.png") group = bot.groups().search('監(jiān)控報(bào)警')[0] print("微信登陸成功!進(jìn)行監(jiān)控報(bào)警功能!") print(group) # listen(bot, token='test', receivers=[group])
Redis消息訂閱模塊:
import redis from wechat_sender import * sender = Sender(token='test', receivers='監(jiān)控報(bào)警') pool = redis.ConnectionPool(host='10.201.3.18', port=6379, db=4, password='kntest%pw_@dk2') r = redis.StrictRedis(connection_pool=pool) p = r.pubsub() p.subscribe('cardniu-monitor') for item in p.listen(): print(item) if item['type'] == 'message': data = item['data'] print("消息隊(duì)列中接收到信息:", data) sender.send(message=data) if item['data'] == 'over': break p.unsubscribe('cardniu-monitor') print('取消訂閱')
3、Java-Redis
最后,在生產(chǎn)者這塊,即是我們監(jiān)控系統(tǒng)的核心部分,當(dāng)我們的Java系統(tǒng)出現(xiàn)異常時(shí),我們即可向Redis發(fā)送消息,最后由消費(fèi)者那一邊完成消息的發(fā)送。
在下面會(huì)跟大家簡(jiǎn)單講解一下生產(chǎn)者這邊的代碼,但是由于代碼設(shè)計(jì)公司內(nèi)容,因此不做過(guò)多的描述。
Spring-redis.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd"> <!-- redis連接池的配置 --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxTotal" value="${redis.pool.maxTotal}" /> <property name="maxIdle" value="${redis.pool.maxIdle}" /> <property name="minIdle" value="${redis.pool.minIdle}" /> <property name="maxWaitMillis" value="${redis.pool.maxWaitMillis}" /> <property name="testOnBorrow" value="${redis.pool.testOnBorrow}" /> <property name="testOnReturn" value="${redis.pool.testOnReturn}" /> </bean> <bean id="sentinelJedisPool" class="redis.clients.jedis.JedisSentinelPool"> <constructor-arg index="0" value="${redis.sentinel.masterName}" /> <constructor-arg index="1" value="#{'${redis.sentinels}'.split(',')}" /> <constructor-arg index="2" ref="jedisPoolConfig" /> <constructor-arg index="3" value="${redis.sentinel.timeout}" type="int" /> <constructor-arg index="4" value="${redis.sentinel.password}" /> <constructor-arg index="5" value="${redis.sentinel.database}" type="int" /> </bean> </beans>
JedisUtils.java
@Autowired private JedisSentinelPool jedisPool; @PostConstruct private void init() throws Exception { /* 緩存初始化 */ JedisUtils.setJedisPool(jedisPool); } public static void setJedisPool(Pool<Jedis> jedisPool) throws Exception { JedisCache.jedisPool = jedisPool; Jedis jedis = null; try { jedis = jedisPool.getResource(); isInitSuc = true; logger.info("redis start success!"); } catch (Exception e) { if (null != jedis) jedisPool.returnBrokenResource(jedis); logger.error("redis start exception?。?!error:{}", e.getMessage(), e); if (e instanceof redis.clients.jedis.exceptions.JedisConnectionException) { throw e; } } finally { if (null != jedis) jedisPool.returnResource(jedis); } } public static Long publish(String chanel, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); return jedis.publish(chanel,value); } catch (Exception e) { if (null != jedis) { jedisPool.returnBrokenResource(jedis); jedis = null; } logger.error("redis exception:{}", e.getMessage(), e); return 0L; } finally { if (null != jedis) jedisPool.returnResource(jedis); } }
NoticeTask.java
@Scheduled(cron = "*/5 * * * * ? ") public void runMonitor() { try { List<T> notices; List<EbankNotice> result; while ((notices = QueueHolder.noticeBlockingQueue.take()) != null) { //消費(fèi) if (notices.isEmpty()) { continue; } result = service.calculateNotice(notices); result.forEach(notice -> { JedisUtils.publish("cardniu-monitor", notice.getTitle() + "," + DateUtils.format(notice.getPostTime(), "yyyy-MM-dd hh:mm:ss") + "," + notice.getLinkAddress()); }); } } catch (InterruptedException e) { logger.error("發(fā)送郵件定時(shí)任務(wù)異常,{}", e.getMessage(), e); } }
4、總結(jié)
這個(gè)項(xiàng)目的核心在于wxpy 的運(yùn)用,以及生產(chǎn)者消費(fèi)者的設(shè)計(jì)思想。語(yǔ)言的話,核心的python這一塊的wxpy,在生產(chǎn)者這邊,無(wú)論是其他的什么語(yǔ)言,都可以作為我們的生產(chǎn)者。
項(xiàng)目github地址:https://github.com/jaycekon/WxpyDemo
參考:
wxpy API:http://wxpy.readthedocs.io/zh/latest/messages.html
wechat_sender Api:https://pypi.python.org/pypi/wechat-sender/0.1.3
python-redis :https://pypi.python.org/pypi/redis
Java-Redis:http://docs.spring.io/spring-data/redis/docs/2.0.0.M4/reference/html/
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
在Python中使用MySQL--PyMySQL的基本使用方法
PyMySQL 是在 Python3.x 版本中用于連接 MySQL 服務(wù)器的一個(gè)庫(kù),Python2中則使用mysqldb。這篇文章主要介紹了在Python中使用MySQL--PyMySQL的基本使用,需要的朋友可以參考下2019-11-11為什么說(shuō)Python可以實(shí)現(xiàn)所有的算法
在本篇文章里小編給各位整理的是關(guān)于一個(gè)Python就可以實(shí)現(xiàn)所有的算法的相關(guān)文章,需要的朋友們參考下。2019-10-10Python爬蟲基礎(chǔ)之爬蟲的分類知識(shí)總結(jié)
來(lái)給大家講python爬蟲的基礎(chǔ)啦,首先我們從爬蟲的分類開始講起,下文有非常詳細(xì)的知識(shí)總結(jié),對(duì)正在學(xué)習(xí)python的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05Win10下安裝并使用tensorflow-gpu1.8.0+python3.6全過(guò)程分析(顯卡MX250+CUDA9.
這篇文章主要介紹了Win10下安裝并使用tensorflow-gpu1.8.0+python3.6全過(guò)程(顯卡MX250+CUDA9.0+cudnn),本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02