hadoop client與datanode的通信協(xié)議分析
本文主要分析了hadoop客戶端read和write block的流程. 以及client和datanode通信的協(xié)議, 數(shù)據(jù)流格式等.
hadoop客戶端與namenode通信通過RPC協(xié)議, 但是client 與datanode通信并沒有使用RPC, 而是直接使用socket, 其中讀寫時的協(xié)議也不同, 本文分析了hadoop 0.20.2版本的(0.19版本也是一樣的)client與datanode通信的原理與通信協(xié)議. 另外需要強調(diào)的是0.23及以后的版本中client與datanode的通信協(xié)議有所變化, 使用了protobuf作為序列化方式.
Write block
1. 客戶端首先通過namenode.create, 向namenode請求創(chuàng)建文件, 然后啟動dataStreamer線程
2. client包括三個線程, main線程負責(zé)把本地數(shù)據(jù)讀入內(nèi)存, 并封裝為Package對象, 放到隊列dataQueue中.
3. dataStreamer線程檢測隊列dataQueue是否有package, 如果有, 則先創(chuàng)建BlockOutPutStream對象(一個block創(chuàng)建一次, 一個block可能包括多個package), 創(chuàng)建的時候會和相應(yīng)的datanode通信, 發(fā)送DATA_TRANSFER_HEADER信息并獲取返回. 然后創(chuàng)建ResponseProcessor線程, 負責(zé)接收datanode的返回ack確認信息, 并進行錯誤處理.
4. dataStreamer從dataQueue中拿出Package對象, 發(fā)送給datanode. 然后繼續(xù)循環(huán)判斷dataQueue是否有數(shù)據(jù)…..
下圖展示了write block的流程.
下圖是報文的格式
Read block
主要在BlockReader類中實現(xiàn).
初始化newBlockReader時,
1. 通過傳入?yún)?shù)sock創(chuàng)建new SocketOutputStream(socket, timeout), 然后寫通信信息, 與寫block的header不大一樣.
//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( genStamp );
out.writeLong( startOffset );
out.writeLong( len );
Text.writeString(out, clientName);
out.flush();
2. 創(chuàng)建輸入流 new SocketInputStream(socket, timeout)
3. 判斷返回消息 in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS
4. 根據(jù)輸入流創(chuàng)建checksum : DataChecksum checksum = DataChecksum.newDataChecksum( in )
5. 讀取第一個Chunk的位置: long firstChunkOffset = in.readLong()
注: 512個字節(jié)為一個chunk計算checksum(4個字節(jié))
6. 接下來在BlockReader的read方法中讀取具體數(shù)據(jù): result = readBuffer(buf, off, realLen)
7. 一個一個chunk的讀取
int packetLen = in.readInt();
long offsetInBlock = in.readLong();
long seqno = in.readLong();
boolean lastPacketInBlock = in.readBoolean();
int dataLen = in.readInt();
IOUtils.readFully(in, checksumBytes.array(), 0,
checksumBytes.limit());
IOUtils.readFully(in, buf, offset, chunkLen);
8. 讀取數(shù)據(jù)后checksum驗證; FSInputChecker.verifySum(chunkPos)
相關(guān)文章
MyBatis通過JDBC數(shù)據(jù)驅(qū)動生成的執(zhí)行語句問題
這篇文章主要介紹了MyBatis通過JDBC數(shù)據(jù)驅(qū)動生成的執(zhí)行語句問題的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-08-08java組件commons-fileupload文件上傳示例
這篇文章主要為大家詳細介紹了java組件commons-fileupload實現(xiàn)文件上傳,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-10-10SpringBoot啟動后立即執(zhí)行的幾種方法小結(jié)
在項目開發(fā)中某些場景必須要用到啟動項目后立即執(zhí)行方式的功能,本文主要介紹了SpringBoot啟動后立即執(zhí)行的幾種方法小結(jié),具有一定的參考價值,感興趣的可以了解一下2023-05-05Java中Dijkstra算法求解最短路徑的實現(xiàn)
Dijkstra算法是一種解決最短路徑問題的常用算法,本文主要介紹了Java中Dijkstra算法求解最短路徑的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2023-09-09Service層異常拋到Controller層處理還是直接處理問題分析
這篇文章主要為大家介紹了Service層異常拋到Controller層處理還是直接處理的問題分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09