Python定時(shí)從Mysql提取數(shù)據(jù)存入Redis的實(shí)現(xiàn)
設(shè)計(jì)思路:
1.程序一旦run起來(lái),python會(huì)把mysql中最近一段時(shí)間的數(shù)據(jù)全部提取出來(lái)
2.然后實(shí)例化redis類(lèi),將數(shù)據(jù)簡(jiǎn)單解析后逐條傳入redis隊(duì)列
3.定時(shí)器設(shè)計(jì)每天凌晨12點(diǎn)開(kāi)始跑
ps:redis是個(gè)內(nèi)存數(shù)據(jù)庫(kù),做后臺(tái)消息隊(duì)列的緩存時(shí)有很大的用處,有興趣的小伙伴可以去查看相關(guān)的文檔。
# -*- coding:utf-8 -*- import MySQLdb import schedule import time import datetime import random import string import redis # get the data from mysql class FromSql(object): def __init__(self, conn): self.conn = conn def acquire(self): cursor = self.conn.cursor() try: sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1" cursor.execute(sql) rs = cursor.fetchall() #print (rs) for eve in rs: print('%s, %s, %s, %s' % eve) copy_rs = rs cursor.close() return copy_rs except Exception as e: print("The error: %s" % e) class RedisQueue(object): def __init__(self, name, namespace='queue', **redis_kwargs): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db= redis.Redis(**redis_kwargs) self.key = '%s:%s' %(namespace, name) def qsize(self): return self.__db.llen(self.key) def put(self, item): self.__db.rpush(self.key, item) def get(self, block=True, timeout=None): if block: item = self.__db.blpop(self.key, timeout=timeout) else: item = self.__db.lpop(self.key) if item: item = item[1] return item def get_nowait(self): return self.get(False) if __name__ == "__main__": # connect mysqldb conn_sql = MySQLdb.connect( host = '127.0.0.1', port = 3306, user = 'root', passwd = '', db = 'test', charset = 'utf8' ) def job_for_redis(): get_data = FromSql(conn_sql) data = get_data.acquire() q = RedisQueue('test',host='localhost', port=6379, db=0) for single_data in data: for meta_data in single_data: q.put(meta_data) print(meta_data) print("All data had been inserted.") """ try: schedule.every().day.at("00:00").do(job_for_redis) except Exception as e: print('Error: %s'% e) # finally: # conn.close() while True: schedule.run_pending() time.sleep(1) """
補(bǔ)充知識(shí):python定時(shí)獲取匯率存入數(shù)據(jù)庫(kù)
python定時(shí)任務(wù):
我們可以使用 輕量級(jí)的第三方模塊schedule。首先先安裝:pip install schedule
定時(shí)任務(wù)的的小測(cè)試:
import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) # 每隔10分鐘執(zhí)行一次任務(wù) schedule.every().hour.do(job) # 每隔一小時(shí)執(zhí)行一次任務(wù) schedule.every().day.at("10:30").do(job) # 每天10:30執(zhí)行一次任務(wù) schedule.every(5).to(10).days.do(job) # 每5-10天執(zhí)行一次任務(wù) schedule.every().monday.do(job) # 每周一的這個(gè)時(shí)候執(zhí)行一次任務(wù) schedule.every().wednesday.at("13:15").do(job) # 每周三13:15執(zhí)行一次任務(wù) while True: schedule.run_pending()
獲取數(shù)據(jù)存入數(shù)據(jù)庫(kù):(格式可能不太對(duì),還有一些符號(hào)。自己修改一下即可)
import pymysql import schedule import time import requests import pandas from sqlalchemy import create_engine #獲取美元的所有外匯 def job(): content = '美元' url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外匯數(shù)據(jù)地址 html = requests.get(url).content.decode('utf-8') index = html.index('<td>' + content + '</td>') str = html[index:index+300] result = re.findall('<td>(.*?)</td>',str) print("幣種:" + result[0]) print("現(xiàn)匯買(mǎi)入價(jià):" + result[1]) print("現(xiàn)鈔買(mǎi)入價(jià):" + result[2]) print("現(xiàn)匯賣(mài)出價(jià):" + result[3]) print("現(xiàn)鈔賣(mài)出價(jià):" + result[4]) print("中行結(jié)算價(jià):" + result[5]) print("發(fā)布時(shí)間:" + result[6] + ' ' + result[7]) #本地地址 數(shù)據(jù)庫(kù)賬號(hào) 密碼 數(shù)據(jù)庫(kù)名 db = pymysql.connect('localhost','root','root','pinyougoudb') cursor = db.cursor() #sql語(yǔ)句 sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0]) cursor.execute(sql) db.commit() print('success') # 查詢語(yǔ)句,將存入的數(shù)據(jù)查出來(lái) # sqlalchemy 進(jìn)行數(shù)據(jù)庫(kù)初始化 engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb') sql = '''select * from tb_money''' # pandas 進(jìn)行數(shù)據(jù)庫(kù)讀寫(xiě) df = pandas.read_sql_query(sql,engine) print(df) db.commit() # 每隔幾分中刷新一次 #schedule.every(0.1).minutes.do(job) #每天什么時(shí)候刷新 schedule.every().day.at("09:29").do(job) schedule.every().day.at("09:30").do(job) #一直循環(huán) 知道滿足條件執(zhí)行 while True: schedule.run_pending()
以上這篇Python定時(shí)從Mysql提取數(shù)據(jù)存入Redis的實(shí)現(xiàn)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Spring?Boot實(shí)戰(zhàn)解決高并發(fā)數(shù)據(jù)入庫(kù)之?Redis?緩存+MySQL?批量入庫(kù)問(wèn)題
- MySQL與Redis如何保證數(shù)據(jù)一致性詳解
- 解決docker重啟redis,mysql數(shù)據(jù)丟失的問(wèn)題
- PHP結(jié)合Redis+MySQL實(shí)現(xiàn)冷熱數(shù)據(jù)交換應(yīng)用案例詳解
- PHP的Laravel框架結(jié)合MySQL與Redis數(shù)據(jù)庫(kù)的使用部署
- 從MySQL到Redis的簡(jiǎn)單數(shù)據(jù)庫(kù)遷移方法
- python連接MySQL、MongoDB、Redis、memcache等數(shù)據(jù)庫(kù)的方法
- MySQL和Redis的數(shù)據(jù)一致性問(wèn)題
相關(guān)文章
Python數(shù)據(jù)集庫(kù)Vaex秒開(kāi)100GB加數(shù)據(jù)
這篇文章主要為大家介紹了Python數(shù)據(jù)集庫(kù)Vaex秒開(kāi)100GB加數(shù)據(jù)實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Python Pexpect庫(kù)的簡(jiǎn)單使用方法
這篇文章主要介紹了Python Pexpect庫(kù)的簡(jiǎn)單使用方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-01-01利用Python輕松實(shí)現(xiàn)視頻轉(zhuǎn)GIF動(dòng)圖
在看視頻的時(shí)候覺(jué)得某段非常有意思想弄成動(dòng)圖,但是無(wú)從下手!本文就將介紹如何利用Python搞定這一需求,感興趣的小伙伴可以學(xué)習(xí)一下2022-01-01Python基于類(lèi)路徑字符串獲取靜態(tài)屬性
這篇文章主要介紹了Python基于類(lèi)路徑字符串獲取靜態(tài)屬性,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03記錄一下scrapy中settings的一些配置小結(jié)
這篇文章主要介紹了記錄一下scrapy中settings的一些配置小結(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09關(guān)于Python文本生成的Beam?Search解碼問(wèn)題
這篇文章主要介紹了Python文本生成的Beam?Search解碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07