PHP使用SWOOLE擴(kuò)展實現(xiàn)定時同步 MySQL 數(shù)據(jù)
南寧公司和幾個分公司之間都使用了呼叫系統(tǒng),然后現(xiàn)在需要做一個呼叫通話數(shù)據(jù)分析,由于分公司的呼叫服務(wù)器是在內(nèi)網(wǎng),通過技術(shù)手段映射出來,分公司到南寧之間的網(wǎng)絡(luò)不穩(wěn)定,所以需要把分公司的通話數(shù)據(jù)同步到南寧。
本身最簡單的方法就是直接配置MySQL的主從同步就可以同步數(shù)據(jù)到南寧來了。但是銷售呼叫系統(tǒng)那邊的公司不給MySQL權(quán)限我們。 所以這個方法只能放棄了。
于是我們干脆的想,使用PHP來實現(xiàn)定時一個簡易的PHP定時同步工具,然后PHP進(jìn)程常駐后臺運(yùn)行,所以首先就先到了一個PHP組件:SWOOLE,經(jīng)過討論,分公司的每天半天生成的數(shù)據(jù)量最大在5000條左右,所以這個方案是可行,就這樣干。
我們使用PHP SWOOLE 做一個異步的定時任務(wù)系統(tǒng)。
本身MySQL數(shù)據(jù)庫的主從同步是通過解析Master庫中的binary-log來進(jìn)行同步數(shù)據(jù)到從庫的。然而我們使用PHP來同步數(shù)據(jù)的時候,那么只能從master庫分批查詢數(shù)據(jù),然后插入到南寧的slave庫來了。
這里我們使用的框架是 ThinkPHP 3.2
.
首先安裝PHP擴(kuò)展: SWOOLE,因為沒有使用到特別的功能,所以這里我們使用pecl來快速安裝:
pecl install swoole
安裝完成后在 php.ini
里面加入 extension="swoole.so"
安裝完成后,我們使用 phpinfo()
來檢查是否成功了.
安裝成功了,我們就來寫業(yè)務(wù).
服務(wù)端
1、首先啟動一個后臺的服務(wù)端,監(jiān)聽端口9501
public function index() { $serv = new \swoole_server("0.0.0.0", 9501); $serv->set([ 'worker_num' => 1,//一般設(shè)置為服務(wù)器CPU數(shù)的1-4倍 'task_worker_num' => 8,//task進(jìn)程的數(shù)量 'daemonize' => 1,//以守護(hù)進(jìn)程執(zhí)行 'max_request' => 10000,//最大請求數(shù)量 "task_ipc_mode " => 2 //使用消息隊列通信,并設(shè)置為爭搶模式 ]); $serv->on('Receive', [$this, 'onReceive']);//接收任務(wù),并投遞 $serv->on('Task', [$this, 'onTask']);//可以在這個方法里面處理任務(wù) $serv->on('Finish', [$this, 'onFinish']);//任務(wù)完成時候調(diào)用 $serv->start(); }
2、接收和投遞任務(wù)
public function onReceive($serv, $fd, $from_id, $data) { //使用json_decode 解析任務(wù)數(shù)據(jù) $areas = json_decode($data,true); foreach ($areas as $area){ //投遞異步任務(wù) $serv->task($area); } }
3、任務(wù)執(zhí)行,數(shù)據(jù)從master庫查詢和寫入到slave數(shù)據(jù)庫
public function onTask($serv, $task_id, $from_id, $task_data) { $area = $task_data;//參數(shù)是地區(qū)編號 $rows = 50; //每頁多少條 //主庫地址,根據(jù)參數(shù)地區(qū)($area)編號切換master數(shù)據(jù)庫連接 //從庫MySQL實例,根據(jù)參數(shù)地區(qū)($area)編號切換slave數(shù)據(jù)庫連接 //由于程序是常駐內(nèi)存的,所以MySQL連接可以使用長連接,然后重復(fù)利用。要使用設(shè)計模式的,可以使用對象池模式 Code...... //master 庫為分公司的數(shù)據(jù)庫,slave庫為數(shù)據(jù)同步到南寧后的從庫 Code...... //使用$sql獲取從庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1 $slaveMaxIncrementId = ...; //使用$sql獲取主庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1 $masterMaxIncrementId = ...; //如果相等的就不同步了 if($slaveMaxIncrementId >= $masterMaxIncrementId){ return false; } //根據(jù)條數(shù)計算頁數(shù) $dataNumber = ceil($masterMaxIncrementId - $slaveMaxIncrementId); $eachNumber = ceil($dataNumber / $rows); $left = 0; //根據(jù)頁數(shù)來進(jìn)行分批循環(huán)進(jìn)行寫入,要記得及時清理內(nèi)存 for ($i = 0; $i < $eachNumber; $i++) { $left = $i == 0 ? $slaveMaxIncrementId : $left + $rows; $right = $left + $rows; //生成分批查詢條件 //$where = "id > $left AND <= $right"; $masterData = ...;//從主庫查詢數(shù)據(jù) $slaveLastInsertId = ...;//插入到從庫 unset($masterData,$slaveLastInsertId); } echo "New AsyncTask[id=$task_id]".PHP_EOL; $serv->finish("$area -> OK"); }
4、任務(wù)完成時候調(diào)用
public function onFinish($serv, $task_id, $task_data) { echo "AsyncTask[$task_id] Finish: $task_data".PHP_EOL; }
客戶端推送任務(wù)
到此基本完成,剩下來我們來寫客戶端任務(wù)推送
public function index() { $client = new \swoole_client(SWOOLE_SOCK_TCP); if (!$client->connect('127.0.0.1', 9501, 1)) { throw new Exception('鏈接SWOOLE服務(wù)錯誤'); } $areas = json_encode(['liuzhou','yulin','beihai','guilin']); //開始遍歷檢查 $client->send($areas); echo "任務(wù)發(fā)送成功".PHP_EOL; }
至此基本完成了,剩下的我們來寫一個shell腳本定時執(zhí)行:/home/wwwroot/sync_db/crontab/send.sh
#!/bin/bash PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin export PATH # 定時推送異步的數(shù)據(jù)同步任務(wù) /usr/bin/php /home/wwwroot/sync_db/server.php home/index/index
使用crontab定時任務(wù),我們把腳本加入定時任務(wù)
#設(shè)置每天12:30執(zhí)行數(shù)據(jù)同步任務(wù) 30 12 * * * root /home/wwwroot/sync_db/crontab/send.sh #設(shè)置每天19:00執(zhí)行數(shù)據(jù)同步任務(wù) 0 19 * * * root /home/wwwroot/sync_db/crontab/send.sh
Tips: 最好推薦在里面加入寫日志操作,這樣好知道是任務(wù)推送、執(zhí)行是否成功。
至此基本完成,程序有待優(yōu)化~~~,各位看客有更好的方法歡迎提出。
相關(guān)文章
laravel 查詢數(shù)據(jù)庫獲取結(jié)果實現(xiàn)判斷是否為空
今天小編就為大家分享一篇laravel 查詢數(shù)據(jù)庫獲取結(jié)果實現(xiàn)判斷是否為空,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-10-10PHP Oauth授權(quán)和本地加密實現(xiàn)方法
下面小編就為大家?guī)硪黄狿HP Oauth授權(quán)和本地加密實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-08-08解決laravel groupBy 對查詢結(jié)果進(jìn)行分組出現(xiàn)的問題
今天小編就為大家分享一篇解決laravel groupBy 對查詢結(jié)果進(jìn)行分組出現(xiàn)的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-10-10基于ubuntu下nginx+php+mysql安裝配置的具體操作步驟
本篇文章介紹了,基于ubuntu下nginx+php+mysql安裝配置的具體操作步驟。需要的朋友參考下2013-04-04詳解php中的password_verify?和?password_hash密碼驗證
驗證密碼是否和指定的散列值匹配,password_verify()?與?crypt()?兼容,因此,由?crypt()?創(chuàng)建的密碼散列可以用于?password_verify()?一起使用,這篇文章主要介紹了php的password_verify?和?password_hash密碼驗證,需要的朋友可以參考下2023-08-08