基于Surprise協(xié)同過濾實現(xiàn)短視頻推薦方法示例
前言
前面一文介紹了通過基礎的web項目結構實現(xiàn)簡單的內容推薦,與其說那個是推薦不如說是一個排序算法。因為熱度計算方式雖然解決了內容的時效質量動態(tài)化。但是相對用戶而言,大家看到的都是幾乎一致的內容(不一樣也可能只是某時間里某視頻的排前或靠后),沒有做到個性化的千人千面。
盡管如此,基于內容的熱度推薦依然有他獨特的應用場景——熱門榜單。所以只需要把這個功能換一個模塊就可以了,將個性化推薦留給更擅長做這方面的算法。
當然了,做推薦系統(tǒng)的方法很多,平臺層面的像spark和今天要講的Surprise。方法層面可以用深度學習做,也可以用協(xié)同過濾,或綜合一起等等。大廠可能就更完善了,在召回階段就有很多通道,比如基于卷積截幀識別視頻內容,文本相似度計算和現(xiàn)有數(shù)據(jù)支撐,后面又經過清洗,粗排,精排,重排等等流程,可能他們更多的是要保證平臺內容的多樣性。
那我們這里依然走入門實際使用為主,能讓我們的項目快速對接上個性化推薦,以下就是在原因PHP項目結構上對接Surprise,實現(xiàn)用戶和物品的相似度推薦。
環(huán)境
- python3.8
- Flask2.0
- pandas2.0
- mysql-connector-python
- surpriseopenpyxlgunicorn
Surprise介紹
Surprise庫是一款用于構建和分析推薦系統(tǒng)的工具庫,他提供了多種推薦算法,包括基線算法、鄰域方法、基于矩陣分解的算法(如SVD、PMF、SVD++、NMF)等。內置了多種相似性度量方法,如余弦相似性、均方差(MSD)、皮爾遜相關系數(shù)等。這些相似性度量方法可以用于評估用戶之間的相似性,從而為推薦系統(tǒng)提供重要的數(shù)據(jù)支持。
協(xié)同過濾數(shù)據(jù)集
既然要基于工具庫完成協(xié)同過濾推薦,自然就需要按該庫的標準進行。Surprise也和大多數(shù)協(xié)同過濾框架類似,數(shù)據(jù)集只需要有用戶對某個物品打分分值,如果自己沒有可以在網上下載免費的Movielens或Jester,以下是我根據(jù)業(yè)務創(chuàng)建的表格,自行參考。
CREATE TABLE `short_video_rating` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` varchar(120) DEFAULT '', `item_id` int(11) DEFAULT '0', `rating` int(11) unsigned DEFAULT '0' COMMENT '評分', `scoring_set` json DEFAULT NULL COMMENT '行為集合', `create_time` int(11) DEFAULT '0', `action_day_time` int(11) DEFAULT '0' COMMENT '更新當天時間', `update_time` int(11) DEFAULT '0' COMMENT '更新時間', `delete_time` int(11) DEFAULT '0' COMMENT '刪除時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=107 DEFAULT CHARSET=utf8mb4 COMMENT='用戶對視頻評分表';
業(yè)務介紹
Web業(yè)務端通過接口或埋點,在用戶操作的地方根據(jù)預設的標準記錄評分記錄。當打分表有數(shù)據(jù)后,用python將SQL記錄轉為表格再導入Surprise,根據(jù)不同的算法訓練,最后根據(jù)接收的參數(shù)返回對應的推薦top列表。python部分由Flask啟動的服務,與php進行http交互,后面將以片段代碼說明。
編碼部分
1. PHP請求封裝
<?php /** * Created by ZERO開發(fā). * User: 北橋蘇 * Date: 2023/6/26 0026 * Time: 14:43 */ namespace app\common\service; class Recommend { private $condition; private $cfRecommends = []; private $output = []; public function __construct($flag = 1, $lastRecommendIds = [], $userId = "") { $this->condition['flag'] = $flag; $this->condition['last_recommend_ids'] = $lastRecommendIds; $this->condition['user_id'] = $userId; } public function addObserver($cfRecommend) { $this->cfRecommends[] = $cfRecommend; } public function startRecommend() { foreach ($this->cfRecommends as $cfRecommend) { $res = $cfRecommend->recommend($this->condition); $this->output = array_merge($res, $this->output); } $this->output = array_values(array_unique($this->output)); return $this->output; } } abstract class cfRecommendBase { protected $cfGatewayUrl = "127.0.0.1:6016"; protected $limit = 15; public function __construct($limit = 15) { $this->limit = $limit; $this->cfGatewayUrl = config('api.video_recommend.gateway_url'); } abstract public function recommend($condition); } class mcf extends cfRecommendBase { public function recommend($condition) { //echo "mcf\n"; $videoIdArr = []; $flag = $condition['flag'] ?? 1; $userId = $condition['user_id'] ?? ''; $url = "{$this->cfGatewayUrl}/mcf_recommend"; if ($flag == 1 && $userId) { //echo "mcf2\n"; $param['raw_uid'] = (string)$userId; $param['top_k'] = $this->limit; $list = httpRequest($url, $param, 'json'); $videoIdArr = json_decode($list, true) ?? []; } return $videoIdArr; } } class icf extends cfRecommendBase { public function recommend($condition) { //echo "icf\n"; $videoIdArr = []; $flag = $condition['flag'] ?? 1; $userId = $condition['user_id'] ?? ''; $lastRecommendIds = $condition['last_recommend_ids'] ?? []; $url = "{$this->cfGatewayUrl}/icf_recommend"; if ($flag > 1 && $lastRecommendIds && $userId) { //echo "icf2\n"; $itemId = $lastRecommendIds[0] ?? 0; $param['raw_item_id'] = $itemId; $param['top_k'] = $this->limit; $list = httpRequest($url, $param, 'json'); $videoIdArr = json_decode($list, true) ?? []; } return $videoIdArr; } }
2. PHP發(fā)起推薦獲取
由于考慮到前期視頻存量不足,是采用協(xié)同過濾加熱度榜單結合的方式,前端獲取視頻推薦,接口返回視頻推薦列表的同時也帶了下次請求的標識(分頁碼)。這個分頁碼用于當協(xié)同過濾服務掛了或沒有推薦時,放在榜單列表的分頁。但是又要保證分頁數(shù)是否實際有效,所以當頁碼太大沒有數(shù)據(jù)返回就通過遞歸重置為第一頁,也把頁碼返回前端讓數(shù)據(jù)獲取更流暢。
public static function recommend($flag, $videoIds, $userId) { $nexFlag = $flag + 1; $formatterVideoList = []; try { // 協(xié)同過濾推薦 $isOpen = config('api.video_recommend.is_open'); $cfVideoIds = []; if ($isOpen == 1) { $recommend = new Recommend($flag, $videoIds, $userId); $recommend->addObserver(new mcf(15)); $recommend->addObserver(new icf(15)); $cfVideoIds = $recommend->startRecommend(); } // 已讀視頻 $nowTime = strtotime(date('Ymd')); $timeBefore = $nowTime - 60 * 60 * 24 * 100; $videoIdsFilter = self::getUserVideoRatingByTime($userId, $timeBefore); $cfVideoIds = array_diff($cfVideoIds, $videoIdsFilter); // 違規(guī)視頻過濾 $videoPool = []; $cfVideoIds && $videoPool = ShortVideoModel::listByOrderRaw($cfVideoIds, $flag); // 冷啟動推薦 !$videoPool && $videoPool = self::hotRank($userId, $videoIdsFilter, $flag); if ($videoPool) { list($nexFlag, $videoList) = $videoPool; $formatterVideoList = self::formatterVideoList($videoList, $userId); } } catch (\Exception $e) { $preFileName = str::snake(__FUNCTION__); $path = self::getClassName(); write_log("msg:" . $e->getMessage(), $preFileName . "_error", $path); } return [$nexFlag, $formatterVideoList]; }
3. 數(shù)據(jù)集生成
import os import mysql.connector import datetime import pandas as pd now = datetime.datetime.now() year = now.year month = now.month day = now.day fullDate = str(year) + str(month) + str(day) dir_data = './collaborative_filtering/cf_excel' file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate) db_config = { "host": "127.0.0.1", "database": "database", "user": "user", "password": "password" } if not os.path.exists(file_path): cnx = mysql.connector.connect(user=db_config['user'], password=db_config['password'], host=db_config['host'], database=db_config['database']) df = pd.read_sql_query("SELECT user_id, item_id, rating FROM short_video_rating", cnx) print('---------------插入數(shù)據(jù)集----------------') # 將數(shù)據(jù)幀寫入Excel文件 df.to_excel(file_path, index=False) if not os.path.exists(file_path): raise IOError("Dataset file is not exists!")
4. 協(xié)同過濾服務
from flask import Flask, request, json, Response, abort from collaborative_filtering import cf_item from collaborative_filtering import cf_user from collaborative_filtering import cf_mix from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__) @app.route('/') def hello_world(): return abort(404) @app.route('/mcf_recommend', methods=["POST", "GET"]) def get_mcf_recommendation(): json_data = request.get_json() raw_uid = json_data.get("raw_uid") top_k = json_data.get("top_k") recommend_result = cf_mix.collaborative_fitlering(raw_uid, top_k) return Response(json.dumps(recommend_result), mimetype='application/json') @app.route('/ucf_recommend', methods=["POST", "GET"]) def get_ucf_recommendation(): json_data = request.get_json() raw_uid = json_data.get("raw_uid") top_k = json_data.get("top_k") recommend_result = cf_user.collaborative_fitlering(raw_uid, top_k) return Response(json.dumps(recommend_result), mimetype='application/json') @app.route('/icf_recommend', methods=["POST", "GET"]) def get_icf_recommendation(): json_data = request.get_json() raw_item_id = json_data.get("raw_item_id") top_k = json_data.get("top_k") recommend_result = cf_item.collaborative_fitlering(raw_item_id, top_k) return Response(json.dumps(recommend_result), mimetype='application/json') if __name__ == '__main__': app.run(host="0.0.0.0", debug=True, port=6016 )
5. 基于用戶推薦
# -*- coding: utf-8 -*- # @File : cf_recommendation.py from __future__ import (absolute_import, division, print_function, unicode_literals) from collections import defaultdict import os from surprise import Dataset from surprise import Reader from surprise import BaselineOnly from surprise import KNNBasic from surprise import KNNBaseline from heapq import nlargest import pandas as pd import datetime import time def get_top_n(predictions, n=10): top_n = defaultdict(list) for uid, iid, true_r, est, _ in predictions: top_n[uid].append((iid, est)) for uid, user_ratings in top_n.items(): top_n[uid] = nlargest(n, user_ratings, key=lambda s: s[1]) return top_n class PredictionSet(): def __init__(self, algo, trainset, user_raw_id=None, k=40): self.algo = algo self.trainset = trainset self.k = k if user_raw_id is not None: self.r_uid = user_raw_id self.i_uid = trainset.to_inner_uid(user_raw_id) self.knn_userset = self.algo.get_neighbors(self.i_uid, self.k) user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]]) self.neighbor_items = set() for nnu in self.knn_userset: for (j, _) in trainset.ur[nnu]: if j not in user_items: self.neighbor_items.add(j) def user_build_anti_testset(self, fill=None): fill = self.trainset.global_mean if fill is None else float(fill) anti_testset = [] user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]]) anti_testset += [(self.r_uid, self.trainset.to_raw_iid(i), fill) for i in self.neighbor_items if i not in user_items] return anti_testset def user_build_anti_testset(trainset, user_raw_id, fill=None): fill = trainset.global_mean if fill is None else float(fill) i_uid = trainset.to_inner_uid(user_raw_id) anti_testset = [] user_items = set([j for (j, _) in trainset.ur[i_uid]]) anti_testset += [(user_raw_id, trainset.to_raw_iid(i), fill) for i in trainset.all_items() if i not in user_items] return anti_testset # ================= surprise 推薦部分 ==================== def collaborative_fitlering(raw_uid, top_k): now = datetime.datetime.now() year = now.year month = now.month day = now.day fullDate = str(year) + str(month) + str(day) dir_data = './collaborative_filtering/cf_excel' file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate) if not os.path.exists(file_path): raise IOError("Dataset file is not exists!") # 讀取數(shù)據(jù)集##################### alldata = pd.read_excel(file_path) reader = Reader(line_format='user item rating') dataset = Dataset.load_from_df(alldata, reader=reader) # 所有數(shù)據(jù)生成訓練集 trainset = dataset.build_full_trainset() # ================= BaselineOnly ================== bsl_options = {'method': 'sgd', 'learning_rate': 0.0005} algo_BaselineOnly = BaselineOnly(bsl_options=bsl_options) algo_BaselineOnly.fit(trainset) # 獲得推薦結果 rset = user_build_anti_testset(trainset, raw_uid) # 測試休眠5秒,讓客戶端超時 # time.sleep(5) # print(rset) # exit() predictions = algo_BaselineOnly.test(rset) top_n_baselineonly = get_top_n(predictions, n=5) # ================= KNNBasic ================== sim_options = {'name': 'pearson', 'user_based': True} algo_KNNBasic = KNNBasic(sim_options=sim_options) algo_KNNBasic.fit(trainset) # 獲得推薦結果 --- 只考慮 knn 用戶的 predictor = PredictionSet(algo_KNNBasic, trainset, raw_uid) knn_anti_set = predictor.user_build_anti_testset() predictions = algo_KNNBasic.test(knn_anti_set) top_n_knnbasic = get_top_n(predictions, n=top_k) # ================= KNNBaseline ================== sim_options = {'name': 'pearson_baseline', 'user_based': True} algo_KNNBaseline = KNNBaseline(sim_options=sim_options) algo_KNNBaseline.fit(trainset) # 獲得推薦結果 --- 只考慮 knn 用戶的 predictor = PredictionSet(algo_KNNBaseline, trainset, raw_uid) knn_anti_set = predictor.user_build_anti_testset() predictions = algo_KNNBaseline.test(knn_anti_set) top_n_knnbaseline = get_top_n(predictions, n=top_k) # =============== 按比例生成推薦結果 ================== recommendset = set() for results in [top_n_baselineonly, top_n_knnbasic, top_n_knnbaseline]: for key in results.keys(): for recommendations in results[key]: iid, rating = recommendations recommendset.add(iid) items_baselineonly = set() for key in top_n_baselineonly.keys(): for recommendations in top_n_baselineonly[key]: iid, rating = recommendations items_baselineonly.add(iid) items_knnbasic = set() for key in top_n_knnbasic.keys(): for recommendations in top_n_knnbasic[key]: iid, rating = recommendations items_knnbasic.add(iid) items_knnbaseline = set() for key in top_n_knnbaseline.keys(): for recommendations in top_n_knnbaseline[key]: iid, rating = recommendations items_knnbaseline.add(iid) rank = dict() for recommendation in recommendset: if recommendation not in rank: rank[recommendation] = 0 if recommendation in items_baselineonly: rank[recommendation] += 1 if recommendation in items_knnbasic: rank[recommendation] += 1 if recommendation in items_knnbaseline: rank[recommendation] += 1 max_rank = max(rank, key=lambda s: rank[s]) if max_rank == 1: return list(items_baselineonly) else: result = nlargest(top_k, rank, key=lambda s: rank[s]) return list(result) # print("排名結果: {}".format(result))
6. 基于物品推薦
-*- coding: utf-8 -*- from __future__ import (absolute_import, division, print_function, unicode_literals) from collections import defaultdict import io import os from surprise import SVD, KNNBaseline, Reader, Dataset import pandas as pd import datetime import mysql.connector import pickle # ================= surprise 推薦部分 ==================== def collaborative_fitlering(raw_item_id, top_k): now = datetime.datetime.now() year = now.year month = now.month day = now.day fullDate = str(year) + str(month) + str(day) # dir_data = './collaborative_filtering/cf_excel' dir_data = './cf_excel' file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate) if not os.path.exists(file_path): raise IOError("Dataset file is not exists!") # 讀取數(shù)據(jù)集##################### alldata = pd.read_excel(file_path) reader = Reader(line_format='user item rating') dataset = Dataset.load_from_df(alldata, reader=reader) # 使用協(xié)同過濾必須有這行,將我們的算法運用于整個數(shù)據(jù)集,而不進行交叉驗證,構建了新的矩陣 trainset = dataset.build_full_trainset() # print(pd.DataFrame(list(trainset.global_mean()))) # exit() # 度量準則:pearson距離,協(xié)同過濾:基于item sim_options = {'name': 'pearson_baseline', 'user_based': False} algo = KNNBaseline(sim_options=sim_options) algo.fit(trainset) # 將訓練好的模型序列化到磁盤上 # with open('./cf_models/cf_item_model.pkl', 'wb') as f: # pickle.dump(algo, f) #從磁盤中讀取訓練好的模型 # with open('cf_item_model.pkl', 'rb') as f: # algo = pickle.load(f) # 轉換為內部id toy_story_inner_id = algo.trainset.to_inner_iid(raw_item_id) # 根據(jù)內部id找到最近的10個鄰居 toy_story_neighbors = algo.get_neighbors(toy_story_inner_id, k=top_k) # 將10個鄰居的內部id轉換為item id也就是raw toy_story_neighbors_rids = (algo.trainset.to_raw_iid(inner_id) for inner_id in toy_story_neighbors) result = list(toy_story_neighbors_rids) return result # print(list(toy_story_neighbors_rids)) if __name__ == "__main__": res = collaborative_fitlering(15, 20) print(res)
其他
推薦服務生產部署開發(fā)環(huán)境下可以通過python recommend_service.py啟動,后面部署環(huán)境需要用到gunicorn,方式是安裝后配置環(huán)境變量。代碼里導入werkzeug.middleware.proxy_fix, 修改以下的啟動部分以下內容,啟動改為gunicorn -w 5 -b 0.0.0.0:6016 app:appapp.wsgi_app = ProxyFix(app.wsgi_app)
app.run()
模型本地保存隨著業(yè)務數(shù)據(jù)的累計,自然需要訓練的數(shù)據(jù)集也越來越大,所以后期關于模型訓練周期,可以縮短。也就是定時訓練模型后保存到本地,然后根據(jù)線上的數(shù)據(jù)做出推薦,模型存儲與讀取方法如下。
2.1. 模型存儲
sim_options = {'name': 'pearson_baseline', 'user_based': False} algo = KNNBaseline(sim_options=sim_options) algo.fit(trainset) # 將訓練好的模型序列化到磁盤上 with open('./cf_models/cf_item_model.pkl', 'wb') as f: pickle.dump(algo, f)
2.2. 模型讀取
with open('cf_item_model.pkl', 'rb') as f: algo = pickle.load(f) # 轉換為內部id toy_story_inner_id = algo.trainset.to_inner_iid(raw_item_id) # 根據(jù)內部id找到最近的10個鄰居 toy_story_neighbors = algo.get_neighbors(toy_story_inner_id, k=top_k) # 將10個鄰居的內部id轉換為item id也就是raw toy_story_neighbors_rids = (algo.trainset.to_raw_iid(inner_id) for inner_id in toy_story_neighbors) result = list(toy_story_neighbors_rids) return result
寫在最后
上面的依然只是實現(xiàn)了推薦系統(tǒng)的一小部分,在做數(shù)據(jù)召回不管可以對視頻截幀還可以分離音頻,通過卷積神經網絡識別音頻種類和視頻大致內容。再根據(jù)用戶以往瀏覽記錄形成的標簽實現(xiàn)內容匹配等等,這個還要后期不斷學習和完善的。?
以上就是基于Surprise協(xié)同過濾實現(xiàn)短視頻推薦方法示例的詳細內容,更多關于Surprise短視頻推薦的資料請關注腳本之家其它相關文章!
相關文章
PHP7實現(xiàn)和CryptoJS的AES加密方式互通示例【AES-128-ECB加密】
這篇文章主要介紹了PHP7實現(xiàn)和CryptoJS的AES加密方式互通操作,結合實例形式分析了PHP AES-128-ECB加密算法相關使用技巧,需要的朋友可以參考下2019-06-06關于session在PHP5的配置文件中的詳細設置參數(shù)說明
關于session在PHP5的配置文件中的詳細設置參數(shù)說明,需要的朋友可以參考下。2011-04-04PHP使用PDO操作sqlite數(shù)據(jù)庫應用案例
這篇文章主要介紹了PHP使用PDO操作sqlite數(shù)據(jù)庫,結合實例形式分析了php基于yaf框架使用pdo操作sqlite數(shù)據(jù)的相關原理、步驟與操作技巧,需要的朋友可以參考下2019-03-03PHP 通過Socket收發(fā)十六進制數(shù)據(jù)的實現(xiàn)代碼
以下是對PHP中通過Socket收發(fā)十六進制數(shù)據(jù)的實現(xiàn)代碼進行了分析介紹。需要的朋友可以過來參考下2013-08-08