10個提升Python模型訓(xùn)練效率的高級腳本
引言
在機器學(xué)習(xí)領(lǐng)域,隨著數(shù)據(jù)集規(guī)模的不斷擴大和模型復(fù)雜度的增加,訓(xùn)練時間變得越來越長。Python的多線程技術(shù)為我們提供了一種有效利用現(xiàn)代多核CPU資源的方法,可以顯著加速數(shù)據(jù)預(yù)處理、特征工程和模型訓(xùn)練過程。本文將介紹10個高級Python多線程腳本,幫助你在機器學(xué)習(xí)項目中實現(xiàn)性能飛躍。
1. 多線程數(shù)據(jù)預(yù)處理流水線
import concurrent.futures import pandas as pd from sklearn.preprocessing import StandardScaler def preprocess_chunk(data_chunk): # 數(shù)據(jù)清洗 data_chunk = data_chunk.dropna() # 特征縮放 scaler = StandardScaler() scaled_features = scaler.fit_transform(data_chunk.select_dtypes(include=['float64'])) data_chunk[data_chunk.select_dtypes(include=['float64']).columns] = scaled_features return data_chunk def parallel_preprocessing(data, chunk_size=10000, workers=4): chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: processed_chunks = list(executor.map(preprocess_chunk, chunks)) return pd.concat(processed_chunks) # 使用示例 # data = pd.read_csv('large_dataset.csv') # processed_data = parallel_preprocessing(data)
應(yīng)用場景:大規(guī)模數(shù)據(jù)集的特征縮放、缺失值處理等預(yù)處理操作。
2. 并行特征工程生成
from concurrent.futures import ThreadPoolExecutor import numpy as np import pandas as pd def generate_feature(args): col1, col2, operation = args if operation == 'add': return col1 + col2 elif operation == 'mul': return col1 * col2 elif operation == 'sub': return col1 - col2 elif operation == 'div': return np.where(col2 != 0, col1 / col2, 0) def parallel_feature_engineering(data, feature_configs, workers=4): features = pd.DataFrame() with ThreadPoolExecutor(max_workers=workers) as executor: results = executor.map(generate_feature, [(data[config['col1']], data[config['col2']], config['op']) for config in feature_configs]) for config, result in zip(feature_configs, results): features[config['name']] = result return pd.concat([data, features], axis=1) # 使用示例 # configs = [ # {'name': 'feat1', 'col1': 'age', 'col2': 'income', 'op': 'mul'}, # {'name': 'feat2', 'col1': 'height', 'col2': 'weight', 'op': 'div'} # ] # enhanced_data = parallel_feature_engineering(data, configs)
應(yīng)用場景:需要生成大量交互特征或派生特征時。
3. 多線程超參數(shù)搜索
from sklearn.model_selection import ParameterGrid from sklearn.ensemble import RandomForestClassifier from concurrent.futures import ThreadPoolExecutor from sklearn.metrics import accuracy_score def train_model(params, X_train, y_train, X_val, y_val): model = RandomForestClassifier(**params) model.fit(X_train, y_train) preds = model.predict(X_val) return accuracy_score(y_val, preds), params def parallel_param_search(X_train, y_train, X_val, y_val, param_grid, workers=4): grid = ParameterGrid(param_grid) best_score = -1 best_params = None with ThreadPoolExecutor(max_workers=workers) as executor: futures = [] for params in grid: futures.append(executor.submit( train_model, params, X_train, y_train, X_val, y_val)) for future in concurrent.futures.as_completed(futures): score, params = future.result() if score > best_score: best_score = score best_params = params return best_params, best_score # 使用示例 # param_grid = { # 'n_estimators': [50, 100, 200], # 'max_depth': [None, 10, 20], # 'min_samples_split': [2, 5, 10] # } # best_params, best_score = parallel_param_search(X_train, y_train, X_val, y_val, param_grid)
應(yīng)用場景:加速隨機森林、梯度提升樹等模型的超參數(shù)調(diào)優(yōu)過程。
4. 并行模型集成
from sklearn.base import clone from concurrent.futures import ThreadPoolExecutor import numpy as np class ParallelEnsemble: def __init__(self, base_estimator, n_estimators=10, workers=4): self.base_estimator = base_estimator self.n_estimators = n_estimators self.workers = workers self.estimators_ = [] def fit(self, X, y): self.estimators_ = [] with ThreadPoolExecutor(max_workers=self.workers) as executor: futures = [] for _ in range(self.n_estimators): estimator = clone(self.base_estimator) futures.append(executor.submit(estimator.fit, X, y)) for future in concurrent.futures.as_completed(futures): self.estimators_.append(future.result()) return self def predict_proba(self, X): probas = [] with ThreadPoolExecutor(max_workers=self.workers) as executor: futures = [executor.submit(estimator.predict_proba, X) for estimator in self.estimators_] for future in concurrent.futures.as_completed(futures): probas.append(future.result()) return np.mean(probas, axis=0) def predict(self, X): proba = self.predict_proba(X) return np.argmax(proba, axis=1) # 使用示例 # from sklearn.linear_model import LogisticRegression # ensemble = ParallelEnsemble(LogisticRegression(), n_estimators=10, workers=4) # ensemble.fit(X_train, y_train) # predictions = ensembl
應(yīng)用場景:創(chuàng)建并行化的bagging集成模型,適用于任何基礎(chǔ)估計器。
5. 多線程交叉驗證評估
from sklearn.model_selection import KFold from concurrent.futures import ThreadPoolExecutor import numpy as np from sklearn.metrics import get_scorer def cross_val_score_parallel(estimator, X, y, cv=5, scoring='accuracy', workers=4): kf = KFold(n_splits=cv) scorer = get_scorer(scoring) scores = [] def train_eval(train_idx, test_idx): X_train, X_test = X[train_idx], X[test_idx] y_train, y_test = y[train_idx], y[test_idx] estimator.fit(X_train, y_train) return scorer(estimator, X_test, y_test) with ThreadPoolExecutor(max_workers=workers) as executor: futures = [] for train_idx, test_idx in kf.split(X): futures.append(executor.submit(train_eval, train_idx, test_idx)) for future in concurrent.futures.as_completed(futures): scores.append(future.result()) return np.array(scores) # 使用示例 # from sklearn.ensemble import GradientBoostingClassifier # model = GradientBoostingClassifier() # scores = cross_val_score_parallel(model, X, y, cv=5, workers=4) # print(f"平均準(zhǔn)確率: {scores.mean():.4f}")
應(yīng)用場景:加速模型的交叉驗證過程,特別適用于計算密集型模型。
6. 并行時間序列特征提取
import numpy as np import pandas as pd from concurrent.futures import ThreadPoolExecutor from tsfresh import extract_features def parallel_ts_feature_extraction(ts_data, column_id='id', column_sort='time', workers=4): ids = ts_data[column_id].unique() chunk_size = len(ids) // workers id_chunks = [ids[i:i + chunk_size] for i in range(0, len(ids), chunk_size)] def process_chunk(chunk_ids): chunk_data = ts_data[ts_data[column_id].isin(chunk_ids)] return extract_features(chunk_data, column_id=column_id, column_sort=column_sort) features = [] with ThreadPoolExecutor(max_workers=workers) as executor: futures = [executor.submit(process_chunk, chunk) for chunk in id_chunks] for future in concurrent.futures.as_completed(futures): features.append(future.result()) return pd.concat(features) # 使用示例 # features = parallel_ts_feature_extraction(time_series_data, workers=4)
應(yīng)用場景:處理大規(guī)模時間序列數(shù)據(jù)集的特征提取。
7. 多線程模型預(yù)測服務(wù)
from concurrent.futures import ThreadPoolExecutor import numpy as np from queue import Queue from threading import Thread class PredictionServer: def __init__(self, model, max_workers=4, batch_size=32): self.model = model self.max_workers = max_workers self.batch_size = batch_size self.input_queue = Queue() self.output_queue = Queue() self.workers = [] def _worker(self): while True: batch = self.input_queue.get() if batch is None: break ids, data = batch preds = self.model.predict(data) self.output_queue.put((ids, preds)) self.input_queue.task_done() def start(self): self.workers = [] for _ in range(self.max_workers): t = Thread(target=self._worker) t.start() self.workers.append(t) def stop(self): for _ in range(self.max_workers): self.input_queue.put(None) for worker in self.workers: worker.join() def predict(self, X): self.start() num_samples = len(X) predictions = [None] * num_samples # 分批提交預(yù)測任務(wù) for i in range(0, num_samples, self.batch_size): batch = (list(range(i, min(i+self.batch_size, num_samples))), X[i:i+self.batch_size]) self.input_queue.put(batch) # 收集結(jié)果 results_received = 0 while results_received < num_samples: ids, preds = self.output_queue.get() for id_, pred in zip(ids, preds): predictions[id_] = pred results_received += len(ids) self.output_queue.task_done() self.stop() return np.array(predictions) # 使用示例 # server = PredictionServer(trained_model, max_workers=4) # predictions = server.predict(X_test)
應(yīng)用場景:構(gòu)建高性能的模型預(yù)測服務(wù),適用于在線或批量預(yù)測場景。
8. 并行特征選擇
from sklearn.feature_selection import SelectKBest, mutual_info_classif from concurrent.futures import ThreadPoolExecutor import numpy as np def parallel_feature_selection(X, y, k_features=10, workers=4): n_features = X.shape[1] features_per_worker = n_features // workers selected_features = [] def select_features(feature_indices): selector = SelectKBest(mutual_info_classif, k=min(k_features, len(feature_indices))) X_subset = X[:, feature_indices] selector.fit(X_subset, y) return [feature_indices[i] for i in selector.get_support(indices=True)] with ThreadPoolExecutor(max_workers=workers) as executor: futures = [] for i in range(workers): start = i * features_per_worker end = (i+1)*features_per_worker if i != workers-1 else n_features feature_indices = list(range(start, end)) futures.append(executor.submit(select_features, feature_indices)) for future in concurrent.futures.as_completed(futures): selected_features.extend(future.result()) # 二次篩選 if len(selected_features) > k_features: selector = SelectKBest(mutual_info_classif, k=k_features) selector.fit(X[:, selected_features], y) selected_features = [selected_features[i] for i in selector.get_support(indices=True)] return selected_features # 使用示例 # selected = parallel_feature_selection(X_train, y_train, k_features=20, workers=4) # X_train_selected = X_train[:, selected] # X_test_selected = X_test[:, selected]
應(yīng)用場景:高維數(shù)據(jù)集的并行特征選擇。
9. 多線程模型持久化
import concurrent.futures import pickle import gzip from pathlib import Path def save_model(model, filepath, compress=True): if compress: with gzip.open(filepath, 'wb') as f: pickle.dump(model, f) else: with open(filepath, 'wb') as f: pickle.dump(model, f) return filepath def parallel_save_models(models_info, workers=4): Path("saved_models").mkdir(exist_ok=True) with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: futures = [] for model_name, model in models_info.items(): filepath = f"saved_models/{model_name}.pkl.gz" futures.append(executor.submit(save_model, model, filepath)) for future in concurrent.futures.as_completed(futures): print(f"模型已保存到: {future.result()}") # 使用示例 # models = { # 'random_forest': rf_model, # 'gradient_boosting': gb_model, # 'svm': svm_model # } # parallel_save_models(models, workers=4)
應(yīng)用場景:同時保存多個訓(xùn)練好的模型,節(jié)省I/O時間。
10. 多線程數(shù)據(jù)增強
import concurrent.futures import numpy as np from albumentations import Compose, HorizontalFlip, Rotate, RandomBrightnessContrast def augment_image(image, augmentations): return augmentations(image=image)['image'] def parallel_data_augmentation(images, labels, augmentations, multiplier=4, workers=4): augmented_images = [] augmented_labels = [] # 創(chuàng)建增強管道 aug_pipeline = Compose([ HorizontalFlip(p=0.5), Rotate(limit=30, p=0.5), RandomBrightnessContrast(p=0.2), ]) # 準(zhǔn)備任務(wù)參數(shù) tasks = [] for _ in range(multiplier): for img, lbl in zip(images, labels): tasks.append((img, lbl, aug_pipeline)) # 并行執(zhí)行增強 with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: futures = [executor.submit(augment_image, *task[:2], task[2]) for task in tasks] for future, task in zip(futures, tasks): augmented_images.append(future.result()) augmented_labels.append(task[1]) # 合并原始數(shù)據(jù) augmented_images = np.concatenate([images, augmented_images]) augmented_labels = np.concatenate([labels, augmented_labels]) return augmented_images, augmented_labels # 使用示例 # X_train_aug, y_train_aug = parallel_data_augmentation(X_train, y_train, multiplier=3, workers=4)
應(yīng)用場景:圖像數(shù)據(jù)的并行增強,特別適用于深度學(xué)習(xí)中的小數(shù)據(jù)集。
總結(jié)
本文介紹了10個Python多線程在機器學(xué)習(xí)中的高級應(yīng)用腳本,涵蓋了從數(shù)據(jù)預(yù)處理到模型訓(xùn)練、評估和部署的全流程。通過合理利用多線程技術(shù),可以顯著提升機器學(xué)習(xí)工作流的效率,特別是在處理大規(guī)模數(shù)據(jù)或計算密集型任務(wù)時。
到此這篇關(guān)于10個提升Python模型訓(xùn)練效率的高級腳本的文章就介紹到這了,更多相關(guān)Python模型訓(xùn)練內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python attrs提高面向?qū)ο缶幊绦试敿?xì)
Python是面向?qū)ο蟮恼Z言,一般情況下使用面向?qū)ο缶幊虝沟瞄_發(fā)效率更高,軟件質(zhì)量更好,并且代碼更易于擴展,可讀性和可維護性也更高,但是Python的類寫起來是真的累,這是可以在創(chuàng)建類的時候自動添加上attrs模塊,下面文章我們就來介紹這個東西,需要的朋友可參考一下2021-09-09python調(diào)用pytorch的clip模型時報錯原因及解決方法
這篇文章主要介紹了python調(diào)用pytorch的clip模型時報錯,本文給大家分享問題原因及解決方法,本文結(jié)合實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08tensorflow通過模型文件,使用tensorboard查看其模型圖Graph方式
今天小編就為大家分享一篇tensorflow通過模型文件,使用tensorboard查看其模型圖Graph方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01