欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python實(shí)現(xiàn)多線程行情抓取工具的方法

 更新時(shí)間:2018年02月28日 08:34:12   作者:duduniao85  
當(dāng)我們實(shí)現(xiàn)了單線程,接下來(lái)就是實(shí)現(xiàn)多線程了,下面這篇文章主要給大家介紹了關(guān)于python實(shí)現(xiàn)多線程行情抓取工具的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。

思路

借助python當(dāng)中threading模塊與Queue模塊組合可以方便的實(shí)現(xiàn)基于生產(chǎn)者-消費(fèi)者模型的多線程模型。Jimmy大神的tushare一直是廣大python數(shù)據(jù)分析以及業(yè)余量化愛(ài)好者喜愛(ài)的免費(fèi)、開(kāi)源的python財(cái)經(jīng)數(shù)據(jù)接口包。

平時(shí)一直有在用阿里云服務(wù)器通過(guò)tushare的接口自動(dòng)落地相關(guān)財(cái)經(jīng)數(shù)據(jù),但日復(fù)權(quán)行情數(shù)據(jù)以往在串行下載的過(guò)程當(dāng)中,速度比較慢,有時(shí)遇到網(wǎng)絡(luò)原因還需要重下。每只股票的行情下載過(guò)程中都需要完成下載、落地2個(gè)步驟,一個(gè)可能需要網(wǎng)絡(luò)開(kāi)銷(xiāo)、一個(gè)需要數(shù)據(jù)庫(kù)mysql的存取開(kāi)銷(xiāo)。2者原本就可以獨(dú)立并行執(zhí)行,是個(gè)典型的“生產(chǎn)者-消費(fèi)者”模型。

基于queue與threading模塊的線程使用一般采用以下的套路:

producerQueue=Queue()
consumerQueue=Queue()
lock = threading.Lock()
class producerThead(threading.Thread):
 def __init__(self, producerQueue,consumerQueue):
 self.producerQueue=producerQueue
 self.consumerQueue=consumerQueue



 def run(self):
 while not self.thread_stop:
  try:
  #接收任務(wù),如果連續(xù)20秒沒(méi)有新的任務(wù),線程退出,否則會(huì)一直執(zhí)行
  item=self.producerQueue.get(block=True, timeout=20)
  #阻塞調(diào)用進(jìn)程直到有數(shù)據(jù)可用。如果timeout是個(gè)正整數(shù),
  #阻塞調(diào)用進(jìn)程最多timeout秒,
  #如果一直無(wú)數(shù)據(jù)可用,拋出Empty異常(帶超時(shí)的阻塞調(diào)用)
  except Queue.Empty:
  print("Nothing to do!thread exit!")
  self.thread_stop=True
  break
  #實(shí)現(xiàn)生產(chǎn)者邏輯,生成消費(fèi)者需要處理的內(nèi)容 consumerQueue.put(someItem)
  #還可以邊處理,邊生成新的生產(chǎn)任務(wù)
  doSomethingAboutProducing()
  self.producerQueue.task_done()
 def stop(self):
 self.thread_stop = True

class consumerThead(threading.Thread):
 def __init__(self,lock, consumerQueue):
 self.consumerQueue=consumerQueue
 def run(self):
 while true:
  try:
  #接收任務(wù),如果連續(xù)20秒沒(méi)有新的任務(wù),線程退出,否則會(huì)一直執(zhí)行
  item=self.consumerQueue.get(block=True, timeout=20)
  #阻塞調(diào)用進(jìn)程直到有數(shù)據(jù)可用。如果timeout是個(gè)正整數(shù),
  #阻塞調(diào)用進(jìn)程最多timeout秒,
  #如果一直無(wú)數(shù)據(jù)可用,拋出Empty異常(帶超時(shí)的阻塞調(diào)用)
  except Queue.Empty:
  print("Nothing to do!thread exit!")
  self.thread_stop=True
  break
  doSomethingAboutConsuming(lock)# 處理消費(fèi)者邏輯,必要時(shí)使用線程鎖 ,如文件操作等
  self.consumerQueue.task_done()
#定義主線程
def main():
 for i in range(n):#定義n個(gè)i消費(fèi)者線程
 t = ThreadRead(producerQueue, consumerQueue)
 t.setDaemon(True)
 t.start()
 producerTasks=[] #定義初始化生產(chǎn)者任務(wù)隊(duì)列
 producerQueue.put(producerTasks)
 for i in range(n):#定義n個(gè)生產(chǎn)者錢(qián)程
 t = ThreadWrite(consumerQueue, lock)
 t.setDaemon(True)
 t.start() 
 stock_queue.join()
 data_queue.join()

相關(guān)接口

1,股票列表信息接口

作用

獲取滬深上市公司基本情況。屬性包括:

code,代碼
name,名稱(chēng)
industry,所屬行業(yè)
area,地區(qū)
pe,市盈率
outstanding,流通股本(億)
totals,總股本(億)
totalAssets,總資產(chǎn)(萬(wàn))
liquidAssets,流動(dòng)資產(chǎn)
fixedAssets,固定資產(chǎn)
reserved,公積金
reservedPerShare,每股公積金
esp,每股收益
bvps,每股凈資
pb,市凈率
timeToMarket,上市日期
undp,未分利潤(rùn)
perundp, 每股未分配
rev,收入同比(%)
profit,利潤(rùn)同比(%)
gpr,毛利率(%)
npr,凈利潤(rùn)率(%)
holders,股東人數(shù)

調(diào)用方法

import tushare as ts
ts.get_stock_basics()

返回效果

  name industry area  pe outstanding  totals totalAssets
code
600606 金豐投資  房產(chǎn)服務(wù) 上海  0.00  51832.01 51832.01 744930.44
002285 世聯(lián)行  房產(chǎn)服務(wù) 深圳 71.04  76352.17 76377.60 411595.28
000861 海印股份  房產(chǎn)服務(wù) 廣東 126.20  83775.50 118413.84 730716.56
000526 銀潤(rùn)投資  房產(chǎn)服務(wù) 福建 2421.16  9619.50 9619.50  20065.32
000056 深國(guó)商  房產(chǎn)服務(wù) 深圳  0.00  14305.55 26508.14 787195.94
600895 張江高科  園區(qū)開(kāi)發(fā) 上海 171.60 154868.95 154868.95 1771040.38
600736 蘇州高新  園區(qū)開(kāi)發(fā) 江蘇 48.68 105788.15 105788.15 2125485.75
600663 陸家嘴  園區(qū)開(kāi)發(fā) 上海 47.63 135808.41 186768.41 4562074.50
600658 電子城  園區(qū)開(kāi)發(fā) 北京 19.39  58009.73 58009.73 431300.19
600648 外高橋  園區(qū)開(kāi)發(fā) 上海 65.36  81022.34 113534.90 2508100.75
600639 浦東金橋  園區(qū)開(kāi)發(fā) 上海 57.28  65664.88 92882.50 1241577.00
600604 市北高新  園區(qū)開(kāi)發(fā) 上海 692.87  33352.42 56644.92 329289.50

2,日復(fù)權(quán)行情接口

作用

提供股票上市以來(lái)所有歷史數(shù)據(jù),默認(rèn)為前復(fù)權(quán),讀取后存到本地,作為后續(xù)分析的基礎(chǔ)

調(diào)用方法

ts.get_h_data('002337', start='2015-01-01', end='2015-03-16') #兩個(gè)日期之間的前復(fù)權(quán)數(shù)據(jù)

parameter:
code:string,股票代碼 e.g. 600848
start:string,開(kāi)始日期 format:YYYY-MM-DD 為空時(shí)取當(dāng)前日期
end:string,結(jié)束日期 format:YYYY-MM-DD 為空時(shí)取去年今日
autype:string,復(fù)權(quán)類(lèi)型,qfq-前復(fù)權(quán) hfq-后復(fù)權(quán) None-不復(fù)權(quán),默認(rèn)為qfq
index:Boolean,是否是大盤(pán)指數(shù),默認(rèn)為False
retry_count : int, 默認(rèn)3,如遇網(wǎng)絡(luò)等問(wèn)題重復(fù)執(zhí)行的次數(shù)
pause : int, 默認(rèn) 0,重復(fù)請(qǐng)求數(shù)據(jù)過(guò)程中暫停的秒數(shù),防止請(qǐng)求間隔時(shí)間太短出現(xiàn)的問(wèn)題

return:
date : 交易日期 (index)
open : 開(kāi)盤(pán)價(jià)
high : 最高價(jià)
close : 收盤(pán)價(jià)
low : 最低價(jià)
volume : 成交量
amount : 成交金額

返回結(jié)果

   open high close low  volume  amount
date
2015-03-16 13.27 13.45 13.39 13.00 81212976 1073862784
2015-03-13 13.04 13.38 13.37 13.00 40548836 532739744
2015-03-12 13.29 13.95 13.28 12.96 71505720 962979904
2015-03-11 13.35 13.48 13.15 13.00 59110248 780300736
2015-03-10 13.16 13.67 13.59 12.72 105753088 1393819776
2015-03-09 13.77 14.73 14.13 13.70 139091552 1994454656
2015-03-06 12.17 13.39 13.39 12.17 89486704 1167752960
2015-03-05 12.79 12.80 12.17 12.08 26040832 966927360
2015-03-04 13.96 13.96 13.30 12.58 26636174 1060270720
2015-03-03 12.17 13.10 13.10 12.05 19290366 733336768

實(shí)現(xiàn)

廢話不多說(shuō),直接上代碼,

生產(chǎn)者線程,讀取行情

class ThreadRead(threading.Thread):
 def __init__(self, queue, out_queue):
  '''
  用于根據(jù)股票代碼、需要讀取的日期,讀取增量的日行情數(shù)據(jù),
  :param queue:用于保存需要讀取的股票代碼、起始日期的列表
  :param out_queue:用于保存需要寫(xiě)入到數(shù)據(jù)庫(kù)表的結(jié)果集列表
  :return:
  '''
  threading.Thread.__init__(self)
  self.queue = queue
  self.out_queue = out_queue
 def run(self):
  while true:
   item = self.queue.get()
   time.sleep(0.5)
   try:
    df_h_data = ts.get_h_data(item['code'], start=item['startdate'], retry_count=10, pause=0.01)
    if df_h_data is not None and len(df_h_data)>0:
     df_h_data['secucode'] = item['code']
     df_h_data.index.name = 'date'
     print df_h_data.index,item['code'],item['startdate']
     df_h_data['tradeday'] = df_h_data.index.strftime('%Y-%m-%d')
     self.out_queue.put(df_h_data)
   except Exception, e:
    print str(e)
    self.queue.put(item) # 將沒(méi)有爬取成功的數(shù)據(jù)放回隊(duì)列里面去,以便下次重試。
    time.sleep(10)
    continue
   self.queue.task_done()

消費(fèi)者線程,本地存儲(chǔ)

class ThreadWrite(threading.Thread):
 def __init__(self, queue, lock, db_engine):
  '''
  :param queue: 某種形式的任務(wù)隊(duì)列,此處為tushare為每個(gè)股票返回的最新日復(fù)權(quán)行情數(shù)據(jù)
  :param lock: 暫時(shí)用連接互斥操作,防止mysql高并發(fā),后續(xù)可嘗試去掉
  :param db_engine: mysql數(shù)據(jù)庫(kù)的連接對(duì)象
  :return:no
  '''
  threading.Thread.__init__(self)
  self.queue = queue
  self.lock = lock
  self.db_engine = db_engine

 def run(self):
  while True:
   item = self.queue.get()
   self._save_data(item)
   self.queue.task_done()

 def _save_data(self, item):
   with self.lock:
    try:
     item.to_sql('cron_dailyquote', self.db_engine, if_exists='append', index=False)
    except Exception, e: # 如果是新股,則有可能df_h_data是空對(duì)象,因此需要跳過(guò)此類(lèi)情況不處理
     print str(e)

定義主線程

from Queue import Queue
stock_queue = Queue()
data_queue = Queue()
lock = threading.Lock()
def main():
 '''
 用于測(cè)試多線程讀取數(shù)據(jù)
 :return:
 '''
 #獲取環(huán)境變量,取得相應(yīng)的環(huán)境配置,上線時(shí)不需要再變更代碼
 global stock_queue
 global data_queue
 config=os.getenv('FLASK_CONFIG')
 if config == 'default':
  db_url='mysql+pymysql://root:******@localhost:3306/python?charset=utf8mb4'
 else:
  db_url='mysql+pymysql://root:******@localhost:3306/test?charset=utf8mb4'
 db_engine = create_engine(db_url, echo=True)
 conn = db_engine.connect()
 #TODO 增加ts.get_stock_basics()報(bào)錯(cuò)的處理,如果取不到信息則直接用數(shù)據(jù)庫(kù)中的股票代碼信息,來(lái)獲取增量信息
 #TODO 增加一個(gè)標(biāo)志,如果一個(gè)股票代碼的最新日期不是最新日期,則需標(biāo)記該代碼不需要重新獲取數(shù)據(jù),即記錄該股票更新日期到了最新工作日,
 df = ts.get_stock_basics()
 df.to_sql('stock_basics',db_engine,if_exists='replace',dtype={'code': CHAR(6)})
 # 計(jì)算距離當(dāng)前日期最大的工作日,以便每日定時(shí)更新
 today=time.strftime('%Y-%m-%d',time.localtime(time.time()))
 s1=("select max(t.date) from cron_tradeday t where flag=1 and t.date <='"+ today+"'")
 selectsql=text(s1)
 maxTradeay = conn.execute(selectsql).first()
 # 計(jì)算每只股票當(dāng)前加載的最大工作日期,支持重跑
 s = ("select secucode,max(t.tradeday) from cron_dailyquote t group by secucode ")
 selectsql = text(s)
 result = conn.execute(selectsql) # 執(zhí)行查詢(xún)語(yǔ)句
 df_result = pd.DataFrame(result.fetchall())
 df_result.columns=['stockcode','max_tradeday']
 df_result.set_index(df_result['stockcode'],inplace=True)
 # 開(kāi)始?xì)w檔前復(fù)權(quán)歷史行情至數(shù)據(jù)庫(kù)當(dāng)中,以便可以方便地計(jì)算后續(xù)選股模型

 for i in range(3):#使用3個(gè)線程
  t = ThreadRead(stock_queue, data_queue)
  t.setDaemon(True)
  t.start()
 for code in set(list(df.index)):
  try:
   #如果當(dāng)前股票已經(jīng)是最新的行情數(shù)據(jù),則直接跳過(guò),方便重跑。
   #print maxTradeay[0],df_result.loc[code].values[1]
   if df_result.loc[code].values[1] == maxTradeay[0]:
    continue
   startdate=getLastNdate(df_result.loc[code].values[1],1)
  except Exception, e:
   #如果某只股票沒(méi)有相關(guān)的行情,則默認(rèn)開(kāi)始日期為2015年1月1日
   startdate='2015-01-01'
  item={}
  item['code']=code
  item['startdate']=startdate
  stock_queue.put(item) # 生成生產(chǎn)者任務(wù)隊(duì)列
 for i in range(3):
  t = ThreadWrite(data_queue, lock, db_engine)
  t.setDaemon(True)
  t.start()
 stock_queue.join()
 data_queue.join()

執(zhí)行效果

原本需要2,3個(gè)小時(shí)才能執(zhí)行完成的每日復(fù)權(quán)行情增量落地,有效縮短至了1小時(shí)以?xún)?nèi),這里線程數(shù)并不上越多越好,由于復(fù)權(quán)行情讀的是新浪接口,在高并發(fā)情況下會(huì)返回HTTP 503服務(wù)器過(guò)載的錯(cuò)誤,另外高并發(fā)下可能需要使用IP代理池,下載的時(shí)段也需要嘗試多個(gè)時(shí)段進(jìn)行。初次嘗試,如果有更好的方法或者哪里有考慮不周的地方歡迎留言建議或者指正。

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • Python拋出引發(fā)異常(raise)知識(shí)點(diǎn)總結(jié)

    Python拋出引發(fā)異常(raise)知識(shí)點(diǎn)總結(jié)

    在本篇文章里小編給大家整理了關(guān)于Python拋出引發(fā)異常(raise)知識(shí)點(diǎn)總結(jié)內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。
    2021-06-06
  • Python如何在腳本中設(shè)置環(huán)境變量

    Python如何在腳本中設(shè)置環(huán)境變量

    環(huán)境變量是與系統(tǒng)進(jìn)程交互的一種深入方式,它允許用戶(hù)獲得有關(guān)系統(tǒng)屬性、路徑和已經(jīng)存在的變量的更詳細(xì)信息,下面我們就來(lái)看看Python是如何通過(guò)腳本來(lái)設(shè)置環(huán)境變量的吧
    2023-10-10
  • 一個(gè)計(jì)算身份證號(hào)碼校驗(yàn)位的Python小程序

    一個(gè)計(jì)算身份證號(hào)碼校驗(yàn)位的Python小程序

    閑著無(wú)事,便想寫(xiě)個(gè)實(shí)用點(diǎn)的Python小程序,如何計(jì)算機(jī)身份證號(hào)碼的校驗(yàn)位,這樣的文章在網(wǎng)上一抓一大把,這里僅簡(jiǎn)單介紹下吧
    2014-08-08
  • 用Flask實(shí)現(xiàn)token登錄校驗(yàn)的解決方案

    用Flask實(shí)現(xiàn)token登錄校驗(yàn)的解決方案

    網(wǎng)站、小程序、APP 是否已經(jīng)登錄所代表的狀態(tài),代表一個(gè)概念是登錄態(tài), 我們常用的登錄態(tài)驗(yàn)證方式有cookie,session,token,token提供了另外一種不需要緩存賬戶(hù)和密碼的登錄狀態(tài)驗(yàn)證方式,本文給大家介紹了用Flask實(shí)現(xiàn)token登錄校驗(yàn)的解決方案,需要的朋友可以參考下
    2024-03-03
  • Python 帶你快速上手 Apache APISIX 插件開(kāi)發(fā)

    Python 帶你快速上手 Apache APISIX 插件開(kāi)發(fā)

    Apache APISIX Python Runner 來(lái)了,社區(qū)中的小伙伴們?cè)陂_(kāi)發(fā) Apache APISIX 插件時(shí)又多了一種新選擇,本文將用實(shí)列向大家介紹,需要的朋友可以參考下面文章內(nèi)容
    2021-09-09
  • Python+Plotly繪制精美的數(shù)據(jù)分析圖

    Python+Plotly繪制精美的數(shù)據(jù)分析圖

    Plotly?是目前已知的Python最強(qiáng)繪圖庫(kù),比Echarts還強(qiáng)大許多。它的繪制通過(guò)生成一個(gè)web頁(yè)面完成,并且支持調(diào)整圖像大小,動(dòng)態(tài)調(diào)節(jié)參數(shù)。本文將利用Plotly繪制精美的數(shù)據(jù)分析圖,感興趣的可以了解一下
    2022-05-05
  • python爬蟲(chóng)之自制英漢字典

    python爬蟲(chóng)之自制英漢字典

    這篇文章主要為大家詳細(xì)介紹了python爬蟲(chóng)之自制英漢字典的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-06-06
  • 人工智能—Python實(shí)現(xiàn)線性回歸

    人工智能—Python實(shí)現(xiàn)線性回歸

    這篇文章主要介紹了人工智能—Python實(shí)現(xiàn)線性回歸,人工智能分為類(lèi)型、數(shù)據(jù)集、效果評(píng)估、等,線性回歸根據(jù)隨機(jī)初始化的?w?x?b?和?y?來(lái)計(jì)算?loss等步驟實(shí)現(xiàn),下面來(lái)看看文章的具體實(shí)現(xiàn)吧
    2022-01-01
  • Python BeautifulReport可視化報(bào)告代碼實(shí)例

    Python BeautifulReport可視化報(bào)告代碼實(shí)例

    這篇文章主要介紹了Python BeautifulReport可視化報(bào)告代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Python?NumPy教程之?dāng)?shù)據(jù)類(lèi)型對(duì)象詳解

    Python?NumPy教程之?dāng)?shù)據(jù)類(lèi)型對(duì)象詳解

    每個(gè)?ndarray?都有一個(gè)關(guān)聯(lián)的數(shù)據(jù)類(lèi)型?(dtype)?對(duì)象。這個(gè)數(shù)據(jù)類(lèi)型對(duì)象(dtype)告訴我們數(shù)組的布局。本文將通過(guò)示例詳細(xì)講講NumPy的數(shù)據(jù)類(lèi)型對(duì)象,需要的可以參考一下
    2022-08-08

最新評(píng)論