PHP中使用協(xié)同程序?qū)崿F(xiàn)合作多任務(wù)第1/2頁(yè)
PHP5.5一個(gè)比較好的新功能是實(shí)現(xiàn)對(duì)生成器和協(xié)同程序的支持。對(duì)于生成器,PHP的文檔和各種其他的博客文章(就像這一個(gè)或這一個(gè))已經(jīng)有了非常詳細(xì)的講解。協(xié)同程序相對(duì)受到的關(guān)注就少了,所以協(xié)同程序雖然有很強(qiáng)大的功能但也很難被知曉,解釋起來(lái)也比較困難。
這篇文章指導(dǎo)你通過(guò)使用協(xié)同程序來(lái)實(shí)施任務(wù)調(diào)度,通過(guò)實(shí)例實(shí)現(xiàn)對(duì)技術(shù)的理解。我將在前三節(jié)做一個(gè)簡(jiǎn)單的背景介紹。如果你已經(jīng)有了比較好的基礎(chǔ),可以直接跳到“協(xié)同多任務(wù)處理”一節(jié)。
生成器
生成器最基本的思想也是一個(gè)函數(shù),這個(gè)函數(shù)的返回值是依次輸出,而不是只返回一個(gè)單獨(dú)的值?;蛘?,換句話(huà)說(shuō),生成器使你更方便的實(shí)現(xiàn)了迭代器接口。下面通過(guò)實(shí)現(xiàn)一個(gè)xrange函數(shù)來(lái)簡(jiǎn)單說(shuō)明:
<?php
function xrange($start, $end, $step = 1) {
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (xrange(1, 1000000) as $num) {
echo $num, "\n";
}
上面這個(gè)xrange()函數(shù)提供了和PHP的內(nèi)建函數(shù)range()一樣的功能。但是不同的是range()函數(shù)返回的是一個(gè)包含屬組值從1到100萬(wàn)的數(shù)組(注:請(qǐng)查看手冊(cè))。而xrange()函數(shù)返回的是依次輸出這些值的一個(gè)迭代器,而且并不會(huì)真正以數(shù)組形式計(jì)算。
這種方法的優(yōu)點(diǎn)是顯而易見(jiàn)的。它可以讓你在處理大數(shù)據(jù)集合的時(shí)候不用一次性的加載到內(nèi)存中。甚至你可以處理無(wú)限大的數(shù)據(jù)流。
當(dāng)然,也可以不同通過(guò)生成器來(lái)實(shí)現(xiàn)這個(gè)功能,而是可以通過(guò)繼承Iterator接口實(shí)現(xiàn)。通過(guò)使用生成器實(shí)現(xiàn)起來(lái)會(huì)更方便,而不用再去實(shí)現(xiàn)iterator接口中的5個(gè)方法了。
生成器為可中斷的函數(shù)
要從生成器認(rèn)識(shí)協(xié)同程序,理解它們內(nèi)部是如何工作的非常重要:生成器是可中斷的函數(shù),在它里面,yield構(gòu)成了中斷點(diǎn)。
緊接著上面的例子,如果你調(diào)用xrange(1,1000000)的話(huà),xrange()函數(shù)里代碼沒(méi)有真正地運(yùn)行。相反,PHP只是返回了一個(gè)實(shí)現(xiàn)了迭代器接口的 生成器類(lèi)實(shí)例:
<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)
你對(duì)某個(gè)對(duì)象調(diào)用迭代器方法一次,其中的代碼運(yùn)行一次。例如,如果你調(diào)用$range->rewind(),那么xrange()里的代碼運(yùn)行到控制流 第一次出現(xiàn)yield的地方。在這種情況下,這就意味著當(dāng)$i=$start時(shí)yield $i才運(yùn)行。傳遞給yield語(yǔ)句的值是使用$range->current()獲取的。
為了繼續(xù)執(zhí)行生成器中的代碼,你必須調(diào)用$range->next()方法。這將再次啟動(dòng)生成器,直到y(tǒng)ield語(yǔ)句出現(xiàn)。因此,連續(xù)調(diào)用next()和current()方法 你將能從生成器里獲得所有的值,直到某個(gè)點(diǎn)沒(méi)有再出現(xiàn)yield語(yǔ)句。對(duì)xrange()來(lái)說(shuō),這種情形出現(xiàn)在$i超過(guò)$end時(shí)。在這中情況下, 控制流將到達(dá)函數(shù)的終點(diǎn),因此將不執(zhí)行任何代碼。一旦這種情況發(fā)生,vaild()方法將返回假,這時(shí)迭代結(jié)束。
協(xié)程
協(xié)程給上面功能添加的主要東西是回送數(shù)據(jù)給生成器的能力。這將把生成器到調(diào)用者的單向通信轉(zhuǎn)變?yōu)閮烧咧g的雙向通信。
通過(guò)調(diào)用生成器的send()方法而不是其next()方法傳遞數(shù)據(jù)給協(xié)程。下面的logger()協(xié)程是這種通信如何運(yùn)行的例子:
<?php
function logger($fileName) {
$fileHandle = fopen($fileName, 'a');
while (true) {
fwrite($fileHandle, yield . "\n");
}
}
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')
正如你能看到,這兒yield沒(méi)有作為一個(gè)語(yǔ)句來(lái)使用,而是用作一個(gè)表達(dá)式。即它有一個(gè)返回值。yield的返回值是傳遞給send()方法的值。 在這個(gè)例子里,yield將首先返回"Foo",然后返回"Bar"。
上面的例子里yield僅作為接收者?;旌蟽煞N用法是可能的,即既可接收也可發(fā)送。接收和發(fā)送通信如何進(jìn)行的例子如下:
<?php
function gen() {
$ret = (yield 'yield1');
var_dump($ret);
$ret = (yield 'yield2');
var_dump($ret);
}
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen)
// string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen)
// NULL (the return value of ->send())
馬上理解輸出的精確順序有點(diǎn)困難,因此確定你知道為什按照這種方式輸出。我愿意特別指出的有兩點(diǎn):第一點(diǎn),yield表達(dá)式兩邊使用 圓括號(hào)不是偶然。由于技術(shù)原因(雖然我已經(jīng)考慮為賦值增加一個(gè)異常,就像Python那樣),圓括號(hào)是必須的。第二點(diǎn),你可能已經(jīng)注意到 調(diào)用current()之前沒(méi)有調(diào)用rewind()。如果是這么做的,那么已經(jīng)隱含地執(zhí)行了rewind操作。
多任務(wù)協(xié)作
如果閱讀了上面的logger()例子,那么你認(rèn)為“為了雙向通信我為什么要使用協(xié)程呢? 為什么我不能只用常見(jiàn)的類(lèi)呢?”,你這么問(wèn)完全正確。上面的例子演示了基本用法,然而上下文中沒(méi)有真正的展示出使用協(xié)程的優(yōu)點(diǎn)。這就是列舉許多協(xié)程例子的理由。正如上面介紹里提到的,協(xié)程是非常強(qiáng)大的概念,不過(guò)這樣的應(yīng)用很稀少而且常常十分復(fù)雜。給出一些簡(jiǎn)單而真實(shí)的例子很難。
在這篇文章里,我決定去做的是使用協(xié)程實(shí)現(xiàn)多任務(wù)協(xié)作。我們盡力解決的問(wèn)題是你想并發(fā)地運(yùn)行多任務(wù)(或者“程序”)。不過(guò)處理器在一個(gè)時(shí)刻只能運(yùn)行一個(gè)任務(wù)(這篇文章的目標(biāo)是不考慮多核的)。因此處理器需要在不同的任務(wù)之間進(jìn)行切換,而且總是讓每個(gè)任務(wù)運(yùn)行 “一小會(huì)兒”。
多任務(wù)協(xié)作這個(gè)術(shù)語(yǔ)中的“協(xié)作”說(shuō)明了如何進(jìn)行這種切換的:它要求當(dāng)前正在運(yùn)行的任務(wù)自動(dòng)把控制傳回給調(diào)度器,這樣它就可以運(yùn)行其他任務(wù)了。這與“搶占”多任務(wù)相反,搶占多任務(wù)是這樣的:調(diào)度器可以中斷運(yùn)行了一段時(shí)間的任務(wù),不管它喜歡還是不喜歡。協(xié)作多任務(wù)在Windows的早期版本(windows95)和Mac OS中有使用,不過(guò)它們后來(lái)都切換到使用搶先多任務(wù)了。理由相當(dāng)明確:如果你依靠程序自動(dòng)傳回 控制的話(huà),那么壞行為的軟件將很容易為自身占用整個(gè)CPU,不與其他任務(wù)共享。
這個(gè)時(shí)候你應(yīng)當(dāng)明白協(xié)程和任務(wù)調(diào)度之間的聯(lián)系:yield指令提供了任務(wù)中斷自身的一種方法,然后把控制傳遞給調(diào)度器。因此協(xié)程可以運(yùn)行多個(gè)其他任務(wù)。更進(jìn)一步來(lái)說(shuō),yield可以用來(lái)在任務(wù)和調(diào)度器之間進(jìn)行通信。
我們的目的是 對(duì) “任務(wù)”用更輕量級(jí)的包裝的協(xié)程函數(shù):
<?php
class Task {
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendValue) {
$this->sendValue = $sendValue;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}
一個(gè)任務(wù)是用 任務(wù)ID標(biāo)記一個(gè)協(xié)程。使用setSendValue()方法,你可以指定哪些值將被發(fā)送到下次的恢復(fù)(在之后你會(huì)了解到我們需要這個(gè))。 run()函數(shù)確實(shí)沒(méi)有做什么,除了調(diào)用send()方法的協(xié)同程序。要理解為什么添加beforeFirstYieldflag,需要考慮下面的代碼片段:
<?php
function gen() {
yield 'foo';
yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:
$gen->rewind();
var_dump($gen->send('something'));
// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!
通過(guò)添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。
調(diào)度器現(xiàn)在不得不比多任務(wù)循環(huán)要做稍微多點(diǎn)了,然后才運(yùn)行多任務(wù):
<?php
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
$this->taskQueue->enqueue($task);
}
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}
newTask()方法(使用下一個(gè)空閑的任務(wù)id)創(chuàng)建一個(gè)新任務(wù),然后把這個(gè)任務(wù)放入任務(wù)映射數(shù)組里。接著它通過(guò)把任務(wù)放入任務(wù)隊(duì)列里來(lái)實(shí)現(xiàn)對(duì)任務(wù)的調(diào)度。接著run()方法掃描任務(wù)隊(duì)列,運(yùn)行任務(wù)。如果一個(gè)任務(wù)結(jié)束了,那么它將從隊(duì)列里刪除,否則它將在隊(duì)列的末尾再次被調(diào)度。
讓我們看看下面具有兩個(gè)簡(jiǎn)單(并且沒(méi)有什么意義)任務(wù)的調(diào)度器:
<?php
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
兩個(gè)任務(wù)都僅僅回顯一條信息,然后使用yield把控制回傳給調(diào)度器。輸出結(jié)果如下:
This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.
輸出確實(shí)如我們所期望的:對(duì)前五個(gè)迭代來(lái)說(shuō),兩個(gè)任務(wù)是交替運(yùn)行的,接著第二個(gè)任務(wù)結(jié)束后,只有第一個(gè)任務(wù)繼續(xù)運(yùn)行。
與調(diào)度器之間通信
我們的任務(wù)調(diào)度系統(tǒng)將反映這種設(shè)計(jì):不是簡(jiǎn)單地把調(diào)度器傳遞給任務(wù)(這樣久允許它做它想做的任何事),我們將通過(guò)給yield表達(dá)式傳遞信息來(lái)與系統(tǒng)調(diào)用通信。這兒yield即是中斷,也是傳遞信息給調(diào)度器(和從調(diào)度器傳遞出信息)的方法。
為了說(shuō)明系統(tǒng)調(diào)用,我將對(duì)可調(diào)用的系統(tǒng)調(diào)用做一個(gè)小小的封裝:
<?php
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback; // Can't call it directly in PHP :/
return $callback($task, $scheduler);
}
}
它將像其他任何可調(diào)用那樣(使用_invoke)運(yùn)行,不過(guò)它要求調(diào)度器把正在調(diào)用的任務(wù)和自身傳遞給這個(gè)函數(shù)。為了解決這個(gè)問(wèn)題 我們不得不微微的修改調(diào)度器的run方法:
<?php
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();
if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
第一個(gè)系統(tǒng)調(diào)用除了返回任務(wù)ID外什么都沒(méi)有做:
<?php
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
這個(gè)函數(shù)確實(shí)設(shè)置任務(wù)id為下一次發(fā)送的值,并再次調(diào)度了這個(gè)任務(wù)。由于使用了系統(tǒng)調(diào)用,所以調(diào)度器不能自動(dòng)調(diào)用任務(wù),我們需要手工調(diào)度任務(wù)(稍后你將明白為什么這么做)。要使用這個(gè)新的系統(tǒng)調(diào)用的話(huà),我們要重新編寫(xiě)以前的例子:
<?php
function task($max) {
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 1; $i <= $max; ++$i) {
echo "This is task $tid iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();
<?php
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}
[/code]
killTask函數(shù)需要在調(diào)度器里增加一個(gè)方法:
<?php
public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
return true;
}
相關(guān)文章

PHP獲取訪(fǎng)問(wèn)設(shè)備信息的方法示例

PHP n個(gè)不重復(fù)的隨機(jī)數(shù)生成代碼

php強(qiáng)制用戶(hù)轉(zhuǎn)向www域名的方法

PHP實(shí)現(xiàn)限制IP訪(fǎng)問(wèn)的方法