Python實現(xiàn)基于POS算法的區(qū)塊鏈
區(qū)塊鏈中的共識算法
在比特幣公鏈架構(gòu)解析中,就曾提到過為了實現(xiàn)去中介化的設(shè)計,比特幣設(shè)計了一套共識協(xié)議,并通過此協(xié)議來保證系統(tǒng)的穩(wěn)定性和防攻擊性。 并且我們知道,截止目前使用最廣泛,也是最被大家接受的共識算法,是我們先前介紹過的POW(proof of work)工作量證明算法。目前市值排名前二的比特幣和以太坊也是采用的此算法。
雖然POW共識算法取得了巨大的成功,但對它的質(zhì)疑也從來未曾停止過。 其中最主要的一個原因就是電力消耗。據(jù)不完全統(tǒng)計,基于POW的挖礦機制所消耗的電量是非常巨大的,甚至比絕大多數(shù)國家耗電量還要多。這對我們的資源造成了極大的浪費,此外隨著比特大陸等公司的強勢崛起,造成了算力的高度集中。
基于以上種種原因,更多的共識算法被提出來 POS、DPOS、BPFT等等。 今天我們就來認識POS(proof of stake)算法。
Proof of stake,譯為權(quán)益證明。你可能已經(jīng)猜到了,權(quán)益證明簡單理解就是擁有更多token的人,有更大的概率獲得記賬權(quán)利,然后獲得獎勵。 這個概率具體有多大呢? 下面我們在代碼實現(xiàn)中會展示,分析也放在后面。 當(dāng)然,POS是會比POW更好嗎? 會更去中心化嗎? 現(xiàn)在看來未必,所以我們這里也不去對比誰優(yōu)誰劣。 我們站在中立的角度,單純的來討論討論POS這種算法。
代碼實戰(zhàn)
生成一個Block
既然要實現(xiàn)POS算法,那么就難免要生成一條鏈,鏈又是由一個個Block生成的,所以下面我們首先來看看如何生成Block,當(dāng)然在前面的內(nèi)容里面,關(guān)于如何生成Block,以及交易、UTXO等等都已經(jīng)介紹過了。由于今天我們的核心是實現(xiàn)POS,所以關(guān)于Block的生成,我們就用最簡單的實現(xiàn)方式,好讓大家把目光聚焦在核心的內(nèi)容上面。
我們用三個方法來實現(xiàn)生成一個合法的區(qū)塊
- calculate_hash 計算區(qū)塊的hash值
- is_block_valid 校驗區(qū)塊是否合法
- generate_block 生成一個區(qū)塊
from hashlib import sha256 from datetime import datetime def generate_block(oldblock, bpm, address): """ :param oldblock: :param bpm: :param address: :return: """ newblock = { "Index": oldblock["Index"] + 1, "BPM": bpm, "Timestamp": str(datetime.now()), "PrevHash": oldblock["Hash"], "Validator": address } newblock["Hash"] = calculate_hash(newblock) return newblock def calculate_hash(block): record = "".join([ str(block["Index"]), str(block["BPM"]), block["Timestamp"], block["PrevHash"] ]) return sha256(record.encode()).hexdigest() def is_block_valid(newblock, oldblock): """ :param newblock: :param oldblock: :return: """ if oldblock["Index"] + 1 != newblock["Index"]: return False if oldblock["Hash"] != newblock["PrevHash"]: return False if calculate_hash(newblock) != newblock["Hash"]: return False return True
這里為了更靈活,我們沒有用類的實現(xiàn)方式,直接采用函數(shù)來實現(xiàn)了Block生成,相信很容易看懂。
創(chuàng)建一個TCP服務(wù)器
由于我們需要用權(quán)益證明算法來選擇記賬人,所以需要從很多Node(節(jié)點)中選擇記賬人,也就是需要一個server讓節(jié)點鏈接上來,同時要同步信息給節(jié)點。因此需要一個TCP長鏈接。
from socketserver import BaseRequestHandler, ThreadingTCPServer def run(): # start a tcp server serv = ThreadingTCPServer(('', 9090), HandleConn) serv.serve_forever()
在這里我們用了python內(nèi)庫socketserver來創(chuàng)建了一個TCPServer。 需要注意的是,這里我們是采用的多線程的創(chuàng)建方式,這樣可以保證有多個客戶端同時連接上來,而不至于被阻塞。當(dāng)然,這里這個server也是存在問題的,那就是有多少個客戶端連接,就會創(chuàng)建多少個線程,更好的方式是創(chuàng)建一個線程池。由于這里是測試,所以就采用更簡單的方式了。
相信大家已經(jīng)看到了,在我們創(chuàng)建TCPServer的時候,使用到了HandleConn,但是我們還沒有定義,所以接下來我們就來定義一個HandleConn
消息處理器
下面我們來實現(xiàn)Handler函數(shù),Handler函數(shù)在跟Client Node通信的時候,需要我們的Node實現(xiàn)下面的功能
- Node可以輸入balance(token數(shù)量) 也就是股權(quán)數(shù)目
- Node需要能夠接收廣播,方便Server同步區(qū)塊以及記賬人信息
- 添加自己到候選人名單 (候選人為持有token的人)
- 輸入BPM生成Block
- 驗證一個區(qū)塊的合法性
感覺任務(wù)還是蠻多的,接下來我們看代碼實現(xiàn)
import threading from queue import Queue, Empty # 定義變量 block_chain = [] temp_blocks = [] candidate_blocks = Queue() # 創(chuàng)建隊列,用于線程間通信 announcements = Queue() validators = {} My_Lock = threading.Lock() class HandleConn(BaseRequestHandler): def handle(self): print("Got connection from", self.client_address) # validator address self.request.send(b"Enter token balance:") balance = self.request.recv(8192) try: balance = int(balance) except Exception as e: print(e) t = str(datetime.now()) address = sha256(t.encode()).hexdigest() validators[address] = balance print(validators) while True: announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,), daemon=True) announce_winner_t.start() self.request.send(b"\nEnter a new BPM:") bpm = self.request.recv(8192) try: bpm = int(bpm) except Exception as e: print(e) del validators[address] break # with My_Lock: last_block = block_chain[-1] new_block = generate_block(last_block, bpm, address) if is_block_valid(new_block, last_block): print("new block is valid!") candidate_blocks.put(new_block) self.request.send(b"\nEnter a new BPM:\n") annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True) annouce_blockchain_t.start()
這段代碼,可能對大多數(shù)同學(xué)來說是有難度的,在這里我們采用了多線程的方式,同時為了能夠讓消息在線程間通信,我們使用了隊列。 這里使用隊列,也是為了我們的系統(tǒng)可以更好的拓展,后面如果可能,這一節(jié)的程序很容易拓展為分布式系統(tǒng)。 將多線程里面處理的任務(wù)拆分出去成獨立的服務(wù),然后用消息隊列進行通信,就是一個簡單的分布式系統(tǒng)啦。(是不是很激動?)
由于這里有難度,所以代碼還是講一講吧
# validator address self.request.send(b"Enter token balance:") balance = self.request.recv(8192) try: balance = int(balance) except Exception as e: print(e) t = str(datetime.now()) address = sha256(t.encode()).hexdigest() validators[address] = balance print(validators)
這一段就是我們提到的Node 客戶端添加自己到候選人的代碼,每鏈接一個客戶端,就會添加一個候選人。 這里我們用添加的時間戳的hash來記錄候選人。 當(dāng)然也可以用其他的方式,比如我們代碼里面的client_address
announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,), daemon=True) announce_winner_t.start() def annouce_winner(announcements, request): """ :param announcements: :param request: :return: """ while True: try: msg = announcements.get(block=False) request.send(msg.encode()) request.send(b'\n') except Empty: time.sleep(3) continue
然后接下來我們起了一個線程去廣播獲得記賬權(quán)的節(jié)點信息到所有節(jié)點。
self.request.send(b"\nEnter a new BPM:") bpm = self.request.recv(8192) try: bpm = int(bpm) except Exception as e: print(e) del validators[address] break # with My_Lock: last_block = block_chain[-1] new_block = generate_block(last_block, bpm, address) if is_block_valid(new_block, last_block): print("new block is valid!") candidate_blocks.put(new_block)
根據(jù)節(jié)點輸入的BPM值生成一個區(qū)塊,并校驗區(qū)塊的有效性。 將有效的區(qū)塊放到候選區(qū)塊當(dāng)中,等待記賬人將區(qū)塊添加到鏈上。
annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True) annouce_blockchain_t.start() def annouce_blockchain(request): """ :param request: :return: """ while True: time.sleep(30) with My_Lock: output = json.dumps(block_chain) try: request.send(output.encode()) request.send(b'\n') except OSError: pass
最后起一個線程,同步區(qū)塊鏈到所有節(jié)點。
看完了,節(jié)點跟Server交互的部分,接下來是最重要的部分,
POS算法實現(xiàn)
def pick_winner(announcements): """ 選擇記賬人 :param announcements: :return: """ time.sleep(10) while True: with My_Lock: temp = temp_blocks lottery_pool = [] # if temp: for block in temp: if block["Validator"] not in lottery_pool: set_validators = validators k = set_validators.get(block["Validator"]) if k: for i in range(k): lottery_pool.append(block["Validator"]) lottery_winner = choice(lottery_pool) print(lottery_winner) # add block of winner to blockchain and let all the other nodes known for block in temp: if block["Validator"] == lottery_winner: with My_Lock: block_chain.append(block) # write message in queue. msg = "\n{0} 贏得了記賬權(quán)利\n".format(lottery_winner) announcements.put(msg) break with My_Lock: temp_blocks.clear()
這里我們用pick_winner 來選擇記賬權(quán)利,我們根據(jù)token數(shù)量構(gòu)造了一個列表。 一個人獲得記賬權(quán)利的概率為:
p = mount['NodeA']/mount['All']
文字描述就是其token數(shù)目在總數(shù)中的占比。 比如總數(shù)有100個,他有10個,那么其獲得記賬權(quán)的概率就是0.1, 到這里核心的部分就寫的差不多了,接下來,我們來添加節(jié)點,開始測試吧
測試POS的記賬方式
在測試之前,起始還有一部分工作要做,前面我們的run方法需要完善下,代碼如下:
def run(): # create a genesis block t = str(datetime.now()) genesis_block = { "Index": 0, "Timestamp": t, "BPM": 0, "PrevHash": "", "Validator": "" } genesis_block["Hash"] = calculate_hash(genesis_block) print(genesis_block) block_chain.append(genesis_block) thread_canditate = threading.Thread(target=candidate, args=(candidate_blocks,), daemon=True) thread_pick = threading.Thread(target=pick_winner, args=(announcements,), daemon=True) thread_canditate.start() thread_pick.start() # start a tcp server serv = ThreadingTCPServer(('', 9090), HandleConn) serv.serve_forever() def candidate(candidate_blocks): """ :param candidate_blocks: :return: """ while True: try: candi = candidate_blocks.get(block=False) except Empty: time.sleep(5) continue temp_blocks.append(candi) if __name__ == '__main__': run()
添加節(jié)點連接到TCPServer
為了充分減少程序的復(fù)雜性,tcp client我們這里就不實現(xiàn)了,可以放在后面拓展部分。 畢竟我們這個系統(tǒng)是很容易擴展的,后面我們拆分了多線程的部分,在實現(xiàn)tcp client就是一個完整的分布式系統(tǒng)了。
所以,我們這里用linux自帶的命令 nc,不知道nc怎么用的同學(xué)可以google或者 man nc
啟動服務(wù) 運行 python pos.py
打開3個終端
分別輸入下面命令
nc localhost 9090
終端如果輸出
Enter token balance:
說明你client已經(jīng)鏈接服務(wù)器ok啦.
測試POS的記賬方式
接下來依次按照提示操作。 balance可以按心情來操作,因為這里是測試,我們輸入100,
緊接著會提示輸入BPM,我們前面提到過,輸入BPM是為了生成Block,那么就輸入吧,隨便輸入個9. ok, 接下來就稍等片刻,等待記賬。
輸出如同所示
依次在不同的終端,根據(jù)提示輸入數(shù)字,等待消息同步。
生成區(qū)塊鏈
下面是我這邊獲得的3個block信息。
總結(jié)
在上面的代碼中,我們實現(xiàn)了一個完整的基于POS算法記賬的鏈,當(dāng)然這里有許多值得擴展與改進的地方。
- python中多線程開銷比較大,可以改成協(xié)程的方式
- TCP建立的長鏈接是基于TCPServer,是中心化的方式,可以改成P2P對等網(wǎng)絡(luò)
- 鏈的信息不夠完整
- 系統(tǒng)可以拓展成分布式,讓其更健壯
大概列了以上幾點,其他還有很多可以拓展的地方,感興趣的朋友可以先玩玩, 后者等到我們后面的教程。 (廣告打的措手不及,哈哈)
當(dāng)然了,語言不是重點,所以在這里,我也實現(xiàn)了go語言的版本源碼地址
go語言的實現(xiàn)感覺要更好理解一點,也顯得要優(yōu)雅一點。這也是為什么go語言在分布式領(lǐng)域要更搶手的原因之一吧!
項目地址
https://github.com/csunny/py-bitcoin/
參考
https://medium.com/@mycoralhealth/code-your-own-proof-of-stake-blockchain-in-go-610cd99aa658
總結(jié)
以上所述是小編給大家介紹的Python實現(xiàn)基于POS算法的區(qū)塊鏈,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
Python cookbook(數(shù)據(jù)結(jié)構(gòu)與算法)根據(jù)字段將記錄分組操作示例
這篇文章主要介紹了Python cookbook(數(shù)據(jù)結(jié)構(gòu)與算法)根據(jù)字段將記錄分組操作,結(jié)合實例形式分析了itertools.groupby()函數(shù)針對字典進行分組操作的相關(guān)實現(xiàn)技巧,需要的朋友可以參考下2018-03-03Python原始字符串與Unicode字符串操作符用法實例分析
這篇文章主要介紹了Python原始字符串與Unicode字符串操作符用法,結(jié)合實例形式分析了Python針對原始字符與Unicode字符的操作符用法,需要的朋友可以參考下2017-07-07django 刪除數(shù)據(jù)庫表后重新同步的方法
今天小編就為大家分享一篇django 刪除數(shù)據(jù)庫表后重新同步的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05python 實現(xiàn)數(shù)據(jù)庫中數(shù)據(jù)添加、查詢與更新的示例代碼
這篇文章主要介紹了python 實現(xiàn)數(shù)據(jù)庫中數(shù)據(jù)添加、查詢與更新的示例代碼,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-12-12Python學(xué)習(xí)筆記之字典,元組,布爾類型和讀寫文件
這篇文章主要為大家詳細介紹了Python的字典,元組,布爾類型和讀寫文件,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-02-02