欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

10個提升Python模型訓(xùn)練效率的高級腳本

 更新時間:2025年07月20日 08:18:18   作者:Python_trys  
在機器學(xué)習(xí)領(lǐng)域,隨著數(shù)據(jù)集規(guī)模的不斷擴大和模型復(fù)雜度的增加,訓(xùn)練時間變得越來越長,本文將介紹10個高級Python多線程腳本,希望對大家有所幫助

引言

在機器學(xué)習(xí)領(lǐng)域,隨著數(shù)據(jù)集規(guī)模的不斷擴大和模型復(fù)雜度的增加,訓(xùn)練時間變得越來越長。Python的多線程技術(shù)為我們提供了一種有效利用現(xiàn)代多核CPU資源的方法,可以顯著加速數(shù)據(jù)預(yù)處理、特征工程和模型訓(xùn)練過程。本文將介紹10個高級Python多線程腳本,幫助你在機器學(xué)習(xí)項目中實現(xiàn)性能飛躍。

1. 多線程數(shù)據(jù)預(yù)處理流水線

import concurrent.futures
import pandas as pd
from sklearn.preprocessing import StandardScaler

def preprocess_chunk(data_chunk):
    # 數(shù)據(jù)清洗
    data_chunk = data_chunk.dropna()
    # 特征縮放
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(data_chunk.select_dtypes(include=['float64']))
    data_chunk[data_chunk.select_dtypes(include=['float64']).columns] = scaled_features
    return data_chunk

def parallel_preprocessing(data, chunk_size=10000, workers=4):
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        processed_chunks = list(executor.map(preprocess_chunk, chunks))
    return pd.concat(processed_chunks)

# 使用示例
# data = pd.read_csv('large_dataset.csv')
# processed_data = parallel_preprocessing(data)

應(yīng)用場景:大規(guī)模數(shù)據(jù)集的特征縮放、缺失值處理等預(yù)處理操作。

2. 并行特征工程生成

from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd

def generate_feature(args):
    col1, col2, operation = args
    if operation == 'add':
        return col1 + col2
    elif operation == 'mul':
        return col1 * col2
    elif operation == 'sub':
        return col1 - col2
    elif operation == 'div':
        return np.where(col2 != 0, col1 / col2, 0)

def parallel_feature_engineering(data, feature_configs, workers=4):
    features = pd.DataFrame()
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = executor.map(generate_feature, 
                             [(data[config['col1']], data[config['col2']], config['op']) 
                              for config in feature_configs])
        for config, result in zip(feature_configs, results):
            features[config['name']] = result
    return pd.concat([data, features], axis=1)

# 使用示例
# configs = [
#     {'name': 'feat1', 'col1': 'age', 'col2': 'income', 'op': 'mul'},
#     {'name': 'feat2', 'col1': 'height', 'col2': 'weight', 'op': 'div'}
# ]
# enhanced_data = parallel_feature_engineering(data, configs)

應(yīng)用場景:需要生成大量交互特征或派生特征時。

3. 多線程超參數(shù)搜索

from sklearn.model_selection import ParameterGrid
from sklearn.ensemble import RandomForestClassifier
from concurrent.futures import ThreadPoolExecutor
from sklearn.metrics import accuracy_score

def train_model(params, X_train, y_train, X_val, y_val):
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    preds = model.predict(X_val)
    return accuracy_score(y_val, preds), params

def parallel_param_search(X_train, y_train, X_val, y_val, param_grid, workers=4):
    grid = ParameterGrid(param_grid)
    best_score = -1
    best_params = None
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for params in grid:
            futures.append(executor.submit(
                train_model, params, X_train, y_train, X_val, y_val))
        
        for future in concurrent.futures.as_completed(futures):
            score, params = future.result()
            if score > best_score:
                best_score = score
                best_params = params
                
    return best_params, best_score

# 使用示例
# param_grid = {
#     'n_estimators': [50, 100, 200],
#     'max_depth': [None, 10, 20],
#     'min_samples_split': [2, 5, 10]
# }
# best_params, best_score = parallel_param_search(X_train, y_train, X_val, y_val, param_grid)

應(yīng)用場景:加速隨機森林、梯度提升樹等模型的超參數(shù)調(diào)優(yōu)過程。

4. 并行模型集成

from sklearn.base import clone
from concurrent.futures import ThreadPoolExecutor
import numpy as np

class ParallelEnsemble:
    def __init__(self, base_estimator, n_estimators=10, workers=4):
        self.base_estimator = base_estimator
        self.n_estimators = n_estimators
        self.workers = workers
        self.estimators_ = []
    
    def fit(self, X, y):
        self.estimators_ = []
        with ThreadPoolExecutor(max_workers=self.workers) as executor:
            futures = []
            for _ in range(self.n_estimators):
                estimator = clone(self.base_estimator)
                futures.append(executor.submit(estimator.fit, X, y))
            
            for future in concurrent.futures.as_completed(futures):
                self.estimators_.append(future.result())
        return self
    
    def predict_proba(self, X):
        probas = []
        with ThreadPoolExecutor(max_workers=self.workers) as executor:
            futures = [executor.submit(estimator.predict_proba, X) 
                      for estimator in self.estimators_]
            for future in concurrent.futures.as_completed(futures):
                probas.append(future.result())
        return np.mean(probas, axis=0)
    
    def predict(self, X):
        proba = self.predict_proba(X)
        return np.argmax(proba, axis=1)

# 使用示例
# from sklearn.linear_model import LogisticRegression
# ensemble = ParallelEnsemble(LogisticRegression(), n_estimators=10, workers=4)
# ensemble.fit(X_train, y_train)
# predictions = ensembl

應(yīng)用場景:創(chuàng)建并行化的bagging集成模型,適用于任何基礎(chǔ)估計器。

5. 多線程交叉驗證評估

from sklearn.model_selection import KFold
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from sklearn.metrics import get_scorer

def cross_val_score_parallel(estimator, X, y, cv=5, scoring='accuracy', workers=4):
    kf = KFold(n_splits=cv)
    scorer = get_scorer(scoring)
    scores = []
    
    def train_eval(train_idx, test_idx):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        estimator.fit(X_train, y_train)
        return scorer(estimator, X_test, y_test)
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for train_idx, test_idx in kf.split(X):
            futures.append(executor.submit(train_eval, train_idx, test_idx))
        
        for future in concurrent.futures.as_completed(futures):
            scores.append(future.result())
    
    return np.array(scores)

# 使用示例
# from sklearn.ensemble import GradientBoostingClassifier
# model = GradientBoostingClassifier()
# scores = cross_val_score_parallel(model, X, y, cv=5, workers=4)
# print(f"平均準(zhǔn)確率: {scores.mean():.4f}")

應(yīng)用場景:加速模型的交叉驗證過程,特別適用于計算密集型模型。

6. 并行時間序列特征提取

import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from tsfresh import extract_features

def parallel_ts_feature_extraction(ts_data, column_id='id', column_sort='time', workers=4):
    ids = ts_data[column_id].unique()
    chunk_size = len(ids) // workers
    id_chunks = [ids[i:i + chunk_size] for i in range(0, len(ids), chunk_size)]
    
    def process_chunk(chunk_ids):
        chunk_data = ts_data[ts_data[column_id].isin(chunk_ids)]
        return extract_features(chunk_data, column_id=column_id, column_sort=column_sort)
    
    features = []
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [executor.submit(process_chunk, chunk) for chunk in id_chunks]
        for future in concurrent.futures.as_completed(futures):
            features.append(future.result())
    
    return pd.concat(features)

# 使用示例
# features = parallel_ts_feature_extraction(time_series_data, workers=4)

應(yīng)用場景:處理大規(guī)模時間序列數(shù)據(jù)集的特征提取。

7. 多線程模型預(yù)測服務(wù)

from concurrent.futures import ThreadPoolExecutor
import numpy as np
from queue import Queue
from threading import Thread

class PredictionServer:
    def __init__(self, model, max_workers=4, batch_size=32):
        self.model = model
        self.max_workers = max_workers
        self.batch_size = batch_size
        self.input_queue = Queue()
        self.output_queue = Queue()
        self.workers = []
        
    def _worker(self):
        while True:
            batch = self.input_queue.get()
            if batch is None:
                break
            ids, data = batch
            preds = self.model.predict(data)
            self.output_queue.put((ids, preds))
            self.input_queue.task_done()
    
    def start(self):
        self.workers = []
        for _ in range(self.max_workers):
            t = Thread(target=self._worker)
            t.start()
            self.workers.append(t)
    
    def stop(self):
        for _ in range(self.max_workers):
            self.input_queue.put(None)
        for worker in self.workers:
            worker.join()
    
    def predict(self, X):
        self.start()
        num_samples = len(X)
        predictions = [None] * num_samples
        
        # 分批提交預(yù)測任務(wù)
        for i in range(0, num_samples, self.batch_size):
            batch = (list(range(i, min(i+self.batch_size, num_samples))),
                     X[i:i+self.batch_size])
            self.input_queue.put(batch)
        
        # 收集結(jié)果
        results_received = 0
        while results_received < num_samples:
            ids, preds = self.output_queue.get()
            for id_, pred in zip(ids, preds):
                predictions[id_] = pred
            results_received += len(ids)
            self.output_queue.task_done()
        
        self.stop()
        return np.array(predictions)

# 使用示例
# server = PredictionServer(trained_model, max_workers=4)
# predictions = server.predict(X_test)

應(yīng)用場景:構(gòu)建高性能的模型預(yù)測服務(wù),適用于在線或批量預(yù)測場景。

8. 并行特征選擇

from sklearn.feature_selection import SelectKBest, mutual_info_classif
from concurrent.futures import ThreadPoolExecutor
import numpy as np

def parallel_feature_selection(X, y, k_features=10, workers=4):
    n_features = X.shape[1]
    features_per_worker = n_features // workers
    selected_features = []
    
    def select_features(feature_indices):
        selector = SelectKBest(mutual_info_classif, k=min(k_features, len(feature_indices)))
        X_subset = X[:, feature_indices]
        selector.fit(X_subset, y)
        return [feature_indices[i] for i in selector.get_support(indices=True)]
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for i in range(workers):
            start = i * features_per_worker
            end = (i+1)*features_per_worker if i != workers-1 else n_features
            feature_indices = list(range(start, end))
            futures.append(executor.submit(select_features, feature_indices))
        
        for future in concurrent.futures.as_completed(futures):
            selected_features.extend(future.result())
    
    # 二次篩選
    if len(selected_features) > k_features:
        selector = SelectKBest(mutual_info_classif, k=k_features)
        selector.fit(X[:, selected_features], y)
        selected_features = [selected_features[i] for i in selector.get_support(indices=True)]
    
    return selected_features

# 使用示例
# selected = parallel_feature_selection(X_train, y_train, k_features=20, workers=4)
# X_train_selected = X_train[:, selected]
# X_test_selected = X_test[:, selected]

應(yīng)用場景:高維數(shù)據(jù)集的并行特征選擇。

9. 多線程模型持久化

import concurrent.futures
import pickle
import gzip
from pathlib import Path

def save_model(model, filepath, compress=True):
    if compress:
        with gzip.open(filepath, 'wb') as f:
            pickle.dump(model, f)
    else:
        with open(filepath, 'wb') as f:
            pickle.dump(model, f)
    return filepath

def parallel_save_models(models_info, workers=4):
    Path("saved_models").mkdir(exist_ok=True)
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for model_name, model in models_info.items():
            filepath = f"saved_models/{model_name}.pkl.gz"
            futures.append(executor.submit(save_model, model, filepath))
        
        for future in concurrent.futures.as_completed(futures):
            print(f"模型已保存到: {future.result()}")

# 使用示例
# models = {
#     'random_forest': rf_model,
#     'gradient_boosting': gb_model,
#     'svm': svm_model
# }
# parallel_save_models(models, workers=4)

應(yīng)用場景:同時保存多個訓(xùn)練好的模型,節(jié)省I/O時間。

10. 多線程數(shù)據(jù)增強

import concurrent.futures
import numpy as np
from albumentations import Compose, HorizontalFlip, Rotate, RandomBrightnessContrast

def augment_image(image, augmentations):
    return augmentations(image=image)['image']

def parallel_data_augmentation(images, labels, augmentations, multiplier=4, workers=4):
    augmented_images = []
    augmented_labels = []
    
    # 創(chuàng)建增強管道
    aug_pipeline = Compose([
        HorizontalFlip(p=0.5),
        Rotate(limit=30, p=0.5),
        RandomBrightnessContrast(p=0.2),
    ])
    
    # 準(zhǔn)備任務(wù)參數(shù)
    tasks = []
    for _ in range(multiplier):
        for img, lbl in zip(images, labels):
            tasks.append((img, lbl, aug_pipeline))
    
    # 并行執(zhí)行增強
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [executor.submit(augment_image, *task[:2], task[2]) for task in tasks]
        for future, task in zip(futures, tasks):
            augmented_images.append(future.result())
            augmented_labels.append(task[1])
    
    # 合并原始數(shù)據(jù)
    augmented_images = np.concatenate([images, augmented_images])
    augmented_labels = np.concatenate([labels, augmented_labels])
    
    return augmented_images, augmented_labels

# 使用示例
# X_train_aug, y_train_aug = parallel_data_augmentation(X_train, y_train, multiplier=3, workers=4)

應(yīng)用場景:圖像數(shù)據(jù)的并行增強,特別適用于深度學(xué)習(xí)中的小數(shù)據(jù)集。

總結(jié)

本文介紹了10個Python多線程在機器學(xué)習(xí)中的高級應(yīng)用腳本,涵蓋了從數(shù)據(jù)預(yù)處理到模型訓(xùn)練、評估和部署的全流程。通過合理利用多線程技術(shù),可以顯著提升機器學(xué)習(xí)工作流的效率,特別是在處理大規(guī)模數(shù)據(jù)或計算密集型任務(wù)時。

到此這篇關(guān)于10個提升Python模型訓(xùn)練效率的高級腳本的文章就介紹到這了,更多相關(guān)Python模型訓(xùn)練內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python中的單行、多行、中文注釋方法

    Python中的單行、多行、中文注釋方法

    今天小編就為大家分享一篇Python中的單行、多行、中文注釋方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • Python對兩個有序列表進行合并和排序的例子

    Python對兩個有序列表進行合并和排序的例子

    這篇文章主要介紹了Python對兩個有序列表進行合并和排序的例子,最終代碼經(jīng)過不斷優(yōu)化,小編非常滿意,需要的朋友可以參考下
    2014-06-06
  • Python異常之常見的Bug類型解決方法

    Python異常之常見的Bug類型解決方法

    這篇文章主要介紹了Python異常之常見的Bug類型解決方法,主要分享一些粗心導(dǎo)致和知識不熟練導(dǎo)致的語法錯誤以及被迫掉坑等內(nèi)容,文章介紹非常詳細(xì)需要的小伙伴可以參考一下
    2022-03-03
  • Python attrs提高面向?qū)ο缶幊绦试敿?xì)

    Python attrs提高面向?qū)ο缶幊绦试敿?xì)

    Python是面向?qū)ο蟮恼Z言,一般情況下使用面向?qū)ο缶幊虝沟瞄_發(fā)效率更高,軟件質(zhì)量更好,并且代碼更易于擴展,可讀性和可維護性也更高,但是Python的類寫起來是真的累,這是可以在創(chuàng)建類的時候自動添加上attrs模塊,下面文章我們就來介紹這個東西,需要的朋友可參考一下
    2021-09-09
  • Python輸入若干整數(shù)求和方式

    Python輸入若干整數(shù)求和方式

    這篇文章主要介紹了Python輸入若干整數(shù)求和方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • Python GUI編程詳解

    Python GUI編程詳解

    這篇文章主要介紹了Python GUI編程,結(jié)合完整示例形式分析了Python基于tkinter模塊的GUI圖形界面編程相關(guān)實現(xiàn)技巧,需要的朋友可以參考下
    2021-10-10
  • python調(diào)用pytorch的clip模型時報錯原因及解決方法

    python調(diào)用pytorch的clip模型時報錯原因及解決方法

    這篇文章主要介紹了python調(diào)用pytorch的clip模型時報錯,本文給大家分享問題原因及解決方法,本文結(jié)合實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-08-08
  • tensorflow通過模型文件,使用tensorboard查看其模型圖Graph方式

    tensorflow通過模型文件,使用tensorboard查看其模型圖Graph方式

    今天小編就為大家分享一篇tensorflow通過模型文件,使用tensorboard查看其模型圖Graph方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-01-01
  • pycharm 關(guān)掉syntax檢查操作

    pycharm 關(guān)掉syntax檢查操作

    這篇文章主要介紹了pycharm 關(guān)掉syntax檢查操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-06-06
  • Windows下python3.6.4安裝教程

    Windows下python3.6.4安裝教程

    這篇文章主要為大家詳細(xì)介紹了Windows下python3.6.4安裝教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-07-07

最新評論