python區(qū)塊鏈實(shí)現(xiàn)簡版網(wǎng)絡(luò)
說明
本文根據(jù)https://github.com/liuchengxu/blockchain-tutorial的內(nèi)容,用python實(shí)現(xiàn)的,但根據(jù)個(gè)人的理解進(jìn)行了一些修改,大量引用了原文的內(nèi)容。文章末尾有"本節(jié)完整源碼實(shí)現(xiàn)地址"。
引言
到目前為止,我們所構(gòu)建的原型已經(jīng)具備了區(qū)塊鏈所有的關(guān)鍵特性:匿名,安全,隨機(jī)生成的地址;區(qū)塊鏈數(shù)據(jù)存儲(chǔ);工作量證明系統(tǒng);可靠地存儲(chǔ)交易。盡管這些特性都不可或缺,但是仍有不足。能夠使得這些特性真正發(fā)光發(fā)熱,使得加密貨幣成為可能的,是網(wǎng)絡(luò)(network)。如果實(shí)現(xiàn)的這樣一個(gè)區(qū)塊鏈僅僅運(yùn)行在單一節(jié)點(diǎn)上,有什么用呢?如果只有一個(gè)用戶,那么這些基于密碼學(xué)的特性,又有什么用呢?正是由于網(wǎng)絡(luò),才使得整個(gè)機(jī)制能夠運(yùn)轉(zhuǎn)和發(fā)光發(fā)熱。
你可以將這些區(qū)塊鏈特性認(rèn)為是規(guī)則(rule),類似于人類在一起生活,繁衍生息建立的規(guī)則,一種社會(huì)安排。區(qū)塊鏈網(wǎng)絡(luò)就是一個(gè)程序社區(qū),里面的每個(gè)程序都遵循同樣的規(guī)則,正是由于遵循著同一個(gè)規(guī)則,才使得網(wǎng)絡(luò)能夠長存。類似的,當(dāng)人們都有著同樣的想法,就能夠?qū)⑷^攥在一起構(gòu)建一個(gè)更好的生活。如果有人遵循著不同的規(guī)則,那么他們就將生活在一個(gè)分裂的社區(qū)(州,公社,等等)中。同樣的,如果有區(qū)塊鏈節(jié)點(diǎn)遵循不同的規(guī)則,那么也會(huì)形成一個(gè)分裂的網(wǎng)絡(luò)。
重點(diǎn)在于:如果沒有網(wǎng)絡(luò),或者大部分節(jié)點(diǎn)都不遵守同樣的規(guī)則,那么規(guī)則就會(huì)形同虛設(shè),毫無用處!
區(qū)塊鏈網(wǎng)絡(luò)
區(qū)塊鏈網(wǎng)絡(luò)是去中心化的,這意味著沒有服務(wù)器,客戶端也不需要依賴服務(wù)器來獲取或處理數(shù)據(jù)。在區(qū)塊鏈網(wǎng)絡(luò)中,有的是節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)是網(wǎng)絡(luò)的一個(gè)完全(full-fledged)成員。節(jié)點(diǎn)就是一切:它既是一個(gè)客戶端,也是一個(gè)服務(wù)器。這一點(diǎn)需要牢記于心,因?yàn)檫@與傳統(tǒng)的網(wǎng)頁應(yīng)用非常不同。
區(qū)塊鏈網(wǎng)絡(luò)是一個(gè) P2P(Peer-to-Peer,端到端)的網(wǎng)絡(luò),即節(jié)點(diǎn)直接連接到其他節(jié)點(diǎn)。它的拓?fù)涫潜馄降?,因?yàn)樵诠?jié)點(diǎn)的世界中沒有層級(jí)之分。下面是它的示意圖:

Business vector created by Dooder - Freepik.com
要實(shí)現(xiàn)這樣一個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)更加困難,因?yàn)樗鼈儽仨殘?zhí)行很多操作。每個(gè)節(jié)點(diǎn)必須與很多其他節(jié)點(diǎn)進(jìn)行交互,它必須請(qǐng)求其他節(jié)點(diǎn)的狀態(tài),與自己的狀態(tài)進(jìn)行比較,當(dāng)狀態(tài)過時(shí)時(shí)進(jìn)行更新。
kademlia發(fā)現(xiàn)協(xié)議
kademlia是p2p的一種節(jié)點(diǎn)發(fā)現(xiàn)協(xié)議,其核心是通過計(jì)算節(jié)點(diǎn)之間的邏輯距離來發(fā)現(xiàn)附近節(jié)點(diǎn)以實(shí)現(xiàn)節(jié)點(diǎn)查找的收斂。
簡化協(xié)議
這里我們?yōu)榱苏f明原理盡可能的簡化協(xié)議。我們只實(shí)現(xiàn)三種請(qǐng)求:
- 節(jié)點(diǎn)握手
- 獲取區(qū)塊數(shù)據(jù)
- 交易廣播為了方便,其中又將節(jié)點(diǎn)握手作為心跳發(fā)送,并根據(jù)心跳信息進(jìn)行區(qū)塊同步。
網(wǎng)絡(luò)協(xié)議方面,借鑒以太坊的做法,UDP做協(xié)議發(fā)現(xiàn),TCP做數(shù)據(jù)傳輸。每當(dāng)發(fā)現(xiàn)一個(gè)節(jié)點(diǎn),就通過TCP建立連接,并發(fā)送心跳數(shù)據(jù),以保證數(shù)據(jù)的一致性。
消息
定義消息類,分別定義了無意義回應(yīng)和以上三種請(qǐng)求。為了方便處理,這里統(tǒng)一使用字符串而不是二進(jìn)制數(shù)據(jù)進(jìn)行數(shù)據(jù)傳輸。
class Msg(object):
NONE_MSG = 0
HAND_SHAKE_MSG = 1
GET_BLOCK_MSG = 2
TRANSACTION_MSG = 3
def __init__(self, code, data):
self.code = code
self.data = data
TCP服務(wù)端
class TCPServer(object):
def __init__(self, ip='0.0.0.0', port=listen_port):
self.sock = socket.socket()
self.ip = ip
self.port = port
def listen(self):
self.sock.bind((self.ip, self.port))
self.sock.listen(5)
def run(self):
t = threading.Thread(target=self.listen_loop, args=())
t.start()
def handle_loop(self, conn, addr):
while True:
recv_data = conn.recv(4096)
log.info("recv_data:"+str(recv_data))
try:
recv_msg = json.loads(recv_data)
except ValueError as e:
conn.sendall('{"code": 0, "data": ""}'.encode())
send_data = self.handle(recv_msg)
log.info("tcpserver_send:"+send_data)
conn.sendall(send_data.encode())
def listen_loop(self):
while True:
conn, addr = self.sock.accept()
t = threading.Thread(target=self.handle_loop, args=(conn, addr))
t.start()
def handle(self, msg):
code = msg.get("code", 0)
log.info("code:"+str(code))
if code == Msg.HAND_SHAKE_MSG:
res_msg = self.handle_handshake(msg)
elif code == Msg.GET_BLOCK_MSG:
res_msg = self.handle_get_block(msg)
elif code == Msg.TRANSACTION_MSG:
res_msg = self.handle_transaction(msg)
else:
return '{"code": 0, "data":""}'
return json.dumps(res_msg.__dict__)
def handle_handshake(self, msg):
block_chain = BlockChain()
block = block_chain.get_last_block()
try:
genesis_block = block_chain[0]
except IndexError as e:
genesis_block = None
data = {
"last_height": -1,
"genesis_block": ""
}
if genesis_block:
data = {
"last_height": block.block_header.height,
"genesis_block": genesis_block.serialize()
}
msg = Msg(Msg.HAND_SHAKE_MSG, data)
return msg
def handle_get_block(self, msg):
height = msg.get("data", 1)
block_chain = BlockChain()
block = block_chain.get_block_by_height(height)
data = block.serialize()
msg = Msg(Msg.GET_BLOCK_MSG, data)
return msg
def handle_transaction(self, msg):
tx_pool = TxPool()
txs = msg.get("data", {})
for tx_data in txs:
tx = Transaction.deserialize(tx_data)
tx_pool.add(tx)
if tx_pool.is_full():
bc = BlockChain()
bc.add_block(tx_pool.txs)
log.info("add block")
tx_pool.clear()
msg = Msg(Msg.NONE_MSG, "")
return msg
TCP端比較簡單,listen_loop方法監(jiān)聽新的請(qǐng)求并開啟一個(gè)新線程處理連接中的數(shù)據(jù)交互。
handle_loop方法調(diào)用了handle分發(fā)處理請(qǐng)求。
handle_handshake處理握手請(qǐng)求,這里將最新塊高度和創(chuàng)世塊發(fā)送出去了,方便和本地?cái)?shù)據(jù)進(jìn)行比較,如果遠(yuǎn)程數(shù)據(jù)更新,那么就獲取新的部分的區(qū)塊。
handle_get_block獲取對(duì)應(yīng)的區(qū)塊并將數(shù)據(jù)發(fā)送給客戶端。
handle_transaction 處理客戶端發(fā)送來的交易信息。把客戶端發(fā)送來的交易添加到未確認(rèn)交易池,如果交易池滿了就添加到區(qū)塊。這里是方便處理才這么做的,實(shí)際上,比特幣中并不是這樣做的,而是由礦工根據(jù)情況進(jìn)行打包區(qū)塊的。
TCP客戶端
class TCPClient(object):
def __init__(self, ip, port):
self.txs = []
self.sock = socket.socket()
log.info("connect ip:"+ip+"\tport:"+str(port))
self.sock.connect((ip, port))
def add_tx(self, tx):
self.txs.append(tx)
def send(self, msg):
data = json.dumps(msg.__dict__)
self.sock.sendall(data.encode())
log.info("send:"+data)
recv_data = self.sock.recv(4096)
log.info("client_recv_data:"+str(recv_data))
try:
recv_msg = json.loads(recv_data)
except json.decoder.JSONDecodeError as e:
return
self.handle(recv_msg)
def handle(self, msg):
code = msg.get("code", 0)
log.info("recv code:"+str(code))
if code == Msg.HAND_SHAKE_MSG:
self.handle_shake(msg)
elif code == Msg.GET_BLOCK_MSG:
self.handle_get_block(msg)
elif code == Msg.TRANSACTION_MSG:
self.handle_transaction(msg)
def shake_loop(self):
while True:
if self.txs:
data = [tx.serialize() for tx in self.txs]
msg = Msg(Msg.TRANSACTION_MSG, data)
self.send(msg)
self.txs.clear()
else:
log.info("shake")
block_chain = BlockChain()
block = block_chain.get_last_block()
try:
genesis_block = block_chain[0]
except IndexError as e:
genesis_block = None
data = {
"last_height": -1,
"genesis_block": ""
}
if genesis_block:
data = {
"last_height": block.block_header.height,
"genesis_block": genesis_block.serialize()
}
msg = Msg(Msg.HAND_SHAKE_MSG, data)
self.send(msg)
time.sleep(5)
def handle_shake(self, msg):
data = msg.get("data", "")
last_height = data.get("last_height", 0)
block_chain = BlockChain()
block = block_chain.get_last_block()
if block:
local_last_height = block.block_header.height
else:
local_last_height = -1
log.info("local_last_height %d, last_height %d" %(local_last_height, last_height))
for i in range(local_last_height + 1, last_height+1):
send_msg = Msg(Msg.GET_BLOCK_MSG, i)
self.send(send_msg)
def handle_get_block(self, msg):
data = msg.get("data", "")
block = Block.deserialize(data)
bc = BlockChain()
try:
bc.add_block_from_peers(block)
except ValueError as e:
log.info(str(e))
def handle_transaction(self, msg):
data = msg.get("data", {})
tx = Transaction.deserialize(data)
tx_pool = TxPool()
tx_pool.add(tx)
if tx_pool.is_full():
bc.add_block(tx_pool.txs)
log.info("mined a block")
tx_pool.clear()
def close(self):
self.sock.close()
handle_transaction處理服務(wù)器發(fā)送來的交易,將交易添加到交易池,如果交易池滿了就添加到區(qū)塊鏈中。
handle_get_block處理服務(wù)器發(fā)送來的區(qū)塊,并將區(qū)塊更新到鏈上。
handle_shake處理服務(wù)器響應(yīng)的握手信息,如果發(fā)現(xiàn)當(dāng)前的的區(qū)塊高度低于數(shù)據(jù)中響應(yīng)的區(qū)塊高高度,則發(fā)起請(qǐng)求獲取新的幾個(gè)區(qū)塊。
shake_loop 每間隔10秒發(fā)送一次握手信息(5秒同步一次區(qū)塊),如果發(fā)現(xiàn)有需要廣播的交易則進(jìn)行交易的廣播。
P2P服務(wù)器
p2p節(jié)點(diǎn)發(fā)現(xiàn)部分,使用了kademlia協(xié)議,并使用了kademlia庫,安裝方法pip3 install kademlia
class P2p(object):
def __init__(self):
self.server = Server()
self.loop = None
def run(self):
loop = asyncio.get_event_loop()
self.loop = loop
loop.run_until_complete(self.server.listen(listen_port))
self.loop.run_until_complete(self.server.bootstrap([(bootstrap_host, bootstrap_port)]))
loop.run_forever()
def get_nodes(self):
nodes = []
for bucket in self.server.protocol.router.buckets:
nodes.extend(bucket.get_nodes())
return nodes
其中run方法啟動(dòng)節(jié)點(diǎn)監(jiān)聽并連接一個(gè)初始節(jié)點(diǎn),并運(yùn)行p2p節(jié)點(diǎn)監(jiān)聽。get_nodes方法獲取當(dāng)前所有的節(jié)點(diǎn)。
連接節(jié)點(diǎn)
class PeerServer(Singleton):
def __init__(self):
if not hasattr(self, "peers"):
self.peers = []
if not hasattr(self, "nodes"):
self.nodes = []
def nodes_find(self, p2p_server):
local_ip = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
while True:
nodes = p2p_server.get_nodes()
for node in nodes:
if node not in self.nodes:
ip = node.ip
port = node.port
if local_ip == ip:
continue
client = TCPClient(ip, port)
t = threading.Thread(target=client.shake_loop, args=())
t.start()
self.peers.append(client)
self.nodes.append(node)
time.sleep(1)
def broadcast_tx(self, tx):
for peer in self.peers:
peer.add_tx(tx)
def run(self, p2p_server):
t = threading.Thread(target=self.nodes_find, args=(p2p_server,))
t.start()
nodes_find為節(jié)點(diǎn)發(fā)現(xiàn)方法,每隔1秒進(jìn)行查找當(dāng)前是否有新的節(jié)點(diǎn),并開啟線程進(jìn)行連接。broadcast_tx為廣播交易的方法,將交易添加到待廣播交易池。
RPC
開啟網(wǎng)絡(luò)監(jiān)聽后,主線程就被p2p網(wǎng)絡(luò)占用了,我們需要另外的方法進(jìn)行交互操作。RPC就是常用的方法。我們將命令行操作都通過rpc導(dǎo)出,然后通過rpc調(diào)用獲取信息。
class Cli(object):
def get_balance(self, addr):
bc = BlockChain()
balance = 0
utxo = UTXOSet()
utxo.reindex(bc)
utxos = utxo.find_utxo(addr)
print(utxos)
for fout in utxos:
balance += fout.txoutput.value
print('%s balance is %d' %(addr, balance))
return balance
def create_wallet(self):
w = Wallet.generate_wallet()
ws = Wallets()
ws[w.address] = w
ws.save()
return w.address
def print_all_wallet(self):
ws = Wallets()
wallets = []
for k, _ in ws.items():
wallets.append(k)
return wallets
def send(self, from_addr, to_addr, amount):
bc = BlockChain()
tx = bc.new_transaction(from_addr, to_addr, amount)
# bc.add_block([tx])
tx_pool = TxPool()
tx_pool.add(tx)
from network import log
log.info("tx_pool:"+str(id(tx_pool)))
log.info("txs_len:"+str(len(tx_pool.txs)))
try:
server = PeerServer()
server.broadcast_tx(tx)
log.info("tx_pool is full:"+str(tx_pool.is_full()))
log.info("tx_pool d :"+str(tx_pool))
if tx_pool.is_full():
bc.add_block(tx_pool.txs)
log.info("add block")
tx_pool.clear()
except Exception as e:
import traceback
msg = traceback.format_exc()
log.info("error_msg:"+msg)
print('send %d from %s to %s' %(amount, from_addr, to_addr))
def print_chain(self, height):
bc = BlockChain()
return bc[height].block_header.serialize()
def create_genesis_block(self):
bc = BlockChain()
w = Wallet.generate_wallet()
ws = Wallets()
ws[w.address] = w
ws.save()
tx = bc.coin_base_tx(w.address)
bc.new_genesis_block(tx)
return w.address
RPC導(dǎo)出:
rpc = RPCServer(export_instance=Cli())
rpc.start(False)
測試
分別打開兩臺(tái)主機(jī)A和B:A主機(jī):
$python3 cli.py start
將B主機(jī)的conf.py中的bootstrap_host和bootstrap_port修改為A主機(jī)的ip和端口。然后啟動(dòng)B主機(jī)。
$python3 cli.py start
任意一臺(tái)主機(jī)開啟新的窗口執(zhí)行生成創(chuàng)世塊:
$python3 cli.py genesis_block Genesis Wallet is: 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r
分別在兩臺(tái)機(jī)器上查看余額:
$python3 cli.py balance 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r balance is 1000
分別在兩臺(tái)機(jī)器上創(chuàng)建地址:
$python3 cli.py createwallet Wallet address is 14sQYjj3n2fReJyVNoqHCmCFjNKEZAVcEB
查看當(dāng)前機(jī)器的所有地址
python3 cli.py printwallet Wallet are: 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH 1MVUrxPuRgtkyLQvAoma4yEarzcMzvQqym 18kruspe7jAbggR1sUF8fCFsZLn6efSeFk 14sQYjj3n2fReJyVNoqHCmCFjNKEZAVcEB
轉(zhuǎn)賬(至少要轉(zhuǎn)兩筆才能確認(rèn)哦,可以修改txpool.py的SIZE屬性來調(diào)整區(qū)塊大小)。注意:只有當(dāng)前有這個(gè)地址(即有這個(gè)私鑰)才能作為from轉(zhuǎn)賬給其他地址。
$python3 cli.py send --from 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r --to 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH --amount 100 $python3 cli.py send --from 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r --to 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH --amount 100
分別在兩臺(tái)機(jī)器上查看余額:
python3 cli.py balance 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r balance is 1900
注意:這里因?yàn)橹貜?fù)轉(zhuǎn)了兩筆賬,使用了同一個(gè)UTXO,所以第二筆會(huì)失敗,由于1LYHea8NjTxaYboXJbR7LemvUZjyQc839r為被獎(jiǎng)勵(lì)地址,所以獲得了1000得挖礦獎(jiǎng)勵(lì)所以余額為:1000-100+900=1900。
打印區(qū)塊信息:
$python3 cli.py print 1
{'timestamp': '1551347915.3271294', 'prev_block_hash': '9f12dad81ab988f247884d7d06de46c6951688dcbedb87df2159669594a44f0d', 'hash': 'a9d02b72690398805fb83efd4680cb710ed4f3c67ea7926fe8faab256c1cad1c', 'hash_merkle_root': 'fe768edf1040c504674e8a468c89f00574a181b88ad2297ef29d307695adb38e', 'height': 1, 'nonce': 3}
區(qū)塊同步方式
為了簡單,區(qū)塊采用最簡單的方式進(jìn)行同步。方法如下:
如果發(fā)現(xiàn)對(duì)方區(qū)塊高度低于自己,則不做處理。
如果發(fā)現(xiàn)對(duì)方區(qū)塊高度高于自己
(1) 當(dāng)前最新區(qū)塊在對(duì)應(yīng)區(qū)塊能找到,那么就更新最新的區(qū)塊
(2) 當(dāng)前最新區(qū)塊在對(duì)應(yīng)區(qū)塊不能找到,那么回滾當(dāng)前區(qū)塊,直到回到交叉點(diǎn),再進(jìn)行更新區(qū)塊。
涉及到的源碼修改較多,這里就不貼源碼了。移步到本節(jié)完整實(shí)現(xiàn)源碼查看完整源碼。
問題
- 為了簡單,將握手和廣播交易合一了,這導(dǎo)致了廣播交易不及時(shí)。
- 新區(qū)塊沒有實(shí)時(shí)進(jìn)行廣播,而是被動(dòng)等待同步,這也導(dǎo)致了區(qū)塊同步較慢。
- 在區(qū)塊未確認(rèn)的情況下用同一個(gè)地址的幣進(jìn)行轉(zhuǎn)賬有只有第一筆會(huì)成功,后面的都會(huì)失敗。這是由目前獲取UTXO的方式?jīng)Q定的。
總結(jié)
我們已經(jīng)實(shí)現(xiàn)了一個(gè)簡版的比特幣,并且實(shí)現(xiàn)了任意節(jié)點(diǎn)加入和區(qū)塊的同步等功能。為了簡化并說明原理,忽略掉了很多細(xì)節(jié),并且忽略掉了性能問題,但它可以說明區(qū)塊鏈的基本原理。
參考:
以上就是python區(qū)塊鏈實(shí)現(xiàn)簡版網(wǎng)絡(luò)的詳細(xì)內(nèi)容,更多關(guān)于python區(qū)塊鏈網(wǎng)絡(luò)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
(手寫)PCA原理及其Python實(shí)現(xiàn)圖文詳解
這篇文章主要介紹了Python來PCA算法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望能給你帶來幫助2021-08-08
深入理解python虛擬機(jī)如何實(shí)現(xiàn)閉包
當(dāng)能夠從設(shè)計(jì)者的層面去理解閉包就再也不用死記硬背一些閉包的概念了,所以本文就來從虛擬機(jī)層面和大家一起討論函數(shù)閉包是如何實(shí)現(xiàn)的2023-10-10
Python?Flask實(shí)現(xiàn)快速構(gòu)建Web應(yīng)用的方法詳解
Flask是一個(gè)輕量級(jí)的Web服務(wù)器網(wǎng)關(guān)接口(WSGI)web應(yīng)用框架,本文將和大家一起詳細(xì)探討一下Python?Flask?Web服務(wù),需要的小伙伴可以學(xué)習(xí)一下2023-06-06
如何通過python實(shí)現(xiàn)人臉識(shí)別驗(yàn)證
這篇文章主要介紹了如何通過python實(shí)現(xiàn)人臉識(shí)別驗(yàn)證,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01
Python 實(shí)現(xiàn)自動(dòng)化Excel報(bào)表的步驟
這篇文章主要介紹了Python 實(shí)現(xiàn)自動(dòng)化Excel報(bào)表的步驟,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下2021-04-04
學(xué)習(xí)python類方法與對(duì)象方法
這篇文章主要和大家一起學(xué)習(xí)python類方法與對(duì)象方法,從一個(gè)簡單例子出發(fā)進(jìn)行學(xué)習(xí),感興趣的小伙伴們可以參考一下2016-03-03

