使用Python實現(xiàn)MapReduce的示例代碼
一、MapReduce
將這個單詞分解為Map、Reduce。
Map階段:在這個階段,輸入數(shù)據(jù)集被分割成小塊,并由多個Map任務(wù)處理。每個Map任務(wù)將輸入數(shù)據(jù)映射為一系列(key, value)對,并生成中間結(jié)果。
Reduce階段:在這個階段,中間結(jié)果被重新分組和排序,以便相同key的中間結(jié)果被傳遞到同一個Reduce任務(wù)。每個Reduce任務(wù)將具有相同key的中間結(jié)果合并、計算,并生成最終的輸出。
舉個例子,在一個很長的字符串中統(tǒng)計某個字符出現(xiàn)的次數(shù)。
from collections import defaultdict
def mapper(word):
return word, 1
def reducer(key_value_pair):
key, values = key_value_pair
return key, sum(values)
def map_reduce_function(input_list, mapper, reducer):
'''
- input_list: 字符列表
- mapper: 映射函數(shù),將輸入列表中的每個元素映射到一個鍵值對
- reducer: 聚合函數(shù),將映射結(jié)果中的每個鍵值對聚合到一個鍵值對
- return: 聚合結(jié)果
'''
map_results = map(mapper, input_list)
shuffler = defaultdict(list)
for key, value in map_results:
shuffler[key].append(value)
return map(reducer, shuffler.items())
if __name__ == "__main__":
words = "python best language".split(" ")
result = list(map_reduce_function(words, mapper, reducer))
print(result)輸出結(jié)果為
[('python', 1), ('best', 1), ('language', 1)]
但是這里并沒有體現(xiàn)出MapReduce的特點。只是展示了MapReduce的運行原理。
二、基于多線程實現(xiàn)MapReduce
from collections import defaultdict
import threading
class MapReduceThread(threading.Thread):
def __init__(self, input_list, mapper, shuffler):
super(MapReduceThread, self).__init__()
self.input_list = input_list
self.mapper = mapper
self.shuffler = shuffler
def run(self):
map_results = map(self.mapper, self.input_list)
for key, value in map_results:
self.shuffler[key].append(value)
def reducer(key_value_pair):
key, values = key_value_pair
return key, sum(values)
def mapper(word):
return word, 1
def map_reduce_function(input_list, num_threads):
shuffler = defaultdict(list)
threads = []
chunk_size = len(input_list) // num_threads
for i in range(0, len(input_list), chunk_size):
chunk = input_list[i:i+chunk_size]
thread = MapReduceThread(chunk, mapper, shuffler)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return map(reducer, shuffler.items())
if __name__ == "__main__":
words = "python is the best language for programming and python is easy to learn".split(" ")
result = list(map_reduce_function(words, num_threads=4))
for i in result:
print(i)這里的本質(zhì)一模一樣,將字符串分割為四份,并且分發(fā)這四個字符串到不同的線程執(zhí)行,最后將執(zhí)行結(jié)果歸約。只不過由于Python的GIL機制,導(dǎo)致Python中的線程是并發(fā)執(zhí)行的,而不能實現(xiàn)并行,所以在python中使用線程來實現(xiàn)MapReduce是不合理的。(GIL機制:搶占式線程,不能在同一時間運行多個線程)。
三、基于多進(jìn)程實現(xiàn)MapReduce
由于Python中GIL機制的存在,無法實現(xiàn)真正的并行。這里有兩種解決方案,一種是使用其他語言,例如C語言,這里我們不考慮;另一種就是利用多核,CPU的多任務(wù)處理能力。
from collections import defaultdict
import multiprocessing
def mapper(chunk):
word_count = defaultdict(int)
for word in chunk.split():
word_count[word] += 1
return word_count
def reducer(word_counts):
result = defaultdict(int)
for word_count in word_counts:
for word, count in word_count.items():
result[word] += count
return result
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
def map_reduce_function(text, num_processes):
chunk_size = (len(text) + num_processes - 1) // num_processes
chunks_list = list(chunks(text, chunk_size))
with multiprocessing.Pool(processes=num_processes) as pool:
word_counts = pool.map(mapper, chunks_list)
result = reducer(word_counts)
return result
if __name__ == "__main__":
text = "python is the best language for programming and python is easy to learn"
num_processes = 4
result = map_reduce_function(text, num_processes)
for i in result:
print(i, result[i])這里使用多進(jìn)程來實現(xiàn)MapReduce,這里就是真正意義上的并行,依然是將數(shù)據(jù)切分,采用并行處理這些數(shù)據(jù),這樣才可以體現(xiàn)出MapReduce的高效特點。但是在這個例子中可能看不出來很大的差異,因為數(shù)據(jù)量太小。在實際應(yīng)用中,如果數(shù)據(jù)集太小,是不適用的,可能無法帶來任何收益,甚至產(chǎn)生更大的開銷導(dǎo)致性能的下降。
四、在100GB的文件中檢索數(shù)據(jù)
這里依然使用MapReduce的思想,但是有兩個問題
讀取速度慢
解決方法:
使用分塊讀取,但是在分區(qū)時不宜過小。因為在創(chuàng)建分區(qū)時會被序列化到進(jìn)程,在進(jìn)程中又需要將其解開,這樣反復(fù)的序列化和反序列化會占用大量時間。不宜過大,因為這樣創(chuàng)建的進(jìn)程會變少,可能無法充分利用CPU的多核能力。
內(nèi)存消耗特別大
解決方法:
使用生成器和迭代器,但需獲取。例如分塊為8塊,生成器會一次讀取一塊的內(nèi)容并且返回對應(yīng)的迭代器,以此類推,這樣就避免了讀取內(nèi)存過大的問題。
from datetime import datetime
import multiprocessing
def chunked_file_reader(file_path:str, chunk_size:int):
"""
生成器函數(shù):分塊讀取文件內(nèi)容
- file_path: 文件路徑
- chunk_size: 塊大小,默認(rèn)為1MB
"""
with open(file_path, 'r', encoding='utf-8') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
def search_in_chunk(chunk:str, keyword:str):
"""在文件塊中搜索關(guān)鍵字
- chunk: 文件塊
- keyword: 要搜索的關(guān)鍵字
"""
lines = chunk.split('\n')
for line in lines:
if keyword in line:
print(f"找到了:", line)
def search_in_file(file_path:str, keyword:str, chunk_size=1024*1024):
"""在文件中搜索關(guān)鍵字
file_path: 文件路徑
keyword: 要搜索的關(guān)鍵字
chunk_size: 文件塊大小,為1MB
"""
with multiprocessing.Pool() as pool:
for chunk in chunked_file_reader(file_path, chunk_size):
pool.apply_async(search_in_chunk, args=(chunk, keyword))
if __name__ == "__main__":
start = datetime.now()
file_path = "file.txt"
keyword = "張三"
search_in_file(file_path, keyword)
end = datetime.now()
print(f"搜索完成,耗時 {end - start}")最后程序運行時間為兩分半左右。
到此這篇關(guān)于使用Python實現(xiàn)MapReduce的示例代碼的文章就介紹到這了,更多相關(guān)Python實現(xiàn)MapReduce內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python使用描述符實現(xiàn)屬性類型檢查的案例解析
這篇文章主要介紹了Python使用描述符實現(xiàn)屬性類型檢查,實例屬性就是在一個類中將另一個類的實例作為該類的一個數(shù)屬性,本文通過代碼演示給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-05-05
Python利用正則表達(dá)式實現(xiàn)計算器算法思路解析
這篇文章主要介紹了Python利用正則表達(dá)式實現(xiàn)計算器算法思路解析,非常不錯,具有參考借鑒價值,需要的朋友參考下吧2018-04-04
Python OpenCV 圖像區(qū)域輪廓標(biāo)記(框選各種小紙條)
這篇文章主要介紹了Python OpenCV 圖像區(qū)域輪廓標(biāo)記(框選各種小紙條),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Python中Numpy和Matplotlib的基本使用指南
numpy庫處理的最基礎(chǔ)數(shù)據(jù)類型是由同種元素構(gòu)成的多維數(shù)組(ndarray),而matplotlib 是提供數(shù)據(jù)繪圖功能的第三方庫,其pyplot子庫主要用于實現(xiàn)各種數(shù)據(jù)展示圖形的繪制,這篇文章主要給大家介紹了關(guān)于Python中Numpy和Matplotlib的基本使用指南,需要的朋友可以參考下2021-11-11
python簡單幾步獲取各種DOS命令顯示的內(nèi)容詳解流程
你會用python獲取各種DOS命令顯示的內(nèi)容核心嗎?說的可不是返回值,是用system()函數(shù)調(diào)用windows操作系統(tǒng)的DOS命令來做點事情,需要的朋友可以參考下2021-10-10

