PHP中使用協(xié)同程序?qū)崿F(xiàn)合作多任務(wù)第2/2頁
用來測試新功能的微腳本:
<?php
function childTask() {
$tid = (yield getTaskId());
while (true) {
echo "Child task $tid still alive!\n";
yield;
}
}
function task() {
$tid = (yield getTaskId());
$childTid = (yield newTask(childTask()));
for ($i = 1; $i <= 6; ++$i) {
echo "Parent task $tid iteration $i.\n";
yield;
if ($i == 3) yield killTask($childTid);
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
這段代碼將打印以下信息:
Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.
你可以實現(xiàn)許多進程管理調(diào)用。例如 wait(它一直等待到任務(wù)結(jié)束運行時),exec(它替代當(dāng)前任務(wù))和fork(它創(chuàng)建一個 當(dāng)前任務(wù)的克?。?。fork非???,而且你可以使用PHP的協(xié)程實現(xiàn)它,因為它們都支持克隆。
然而讓我們把這些留給有興趣的讀者吧,我們?nèi)タ聪乱粋€議題。
幾點人
翻譯于 4天前
0人頂
頂 翻譯的不錯哦!
非阻塞IO
很明顯,我們的任務(wù)管理系統(tǒng)的真正很酷的應(yīng)用是web服務(wù)器。它有一個任務(wù)是在套接字上偵聽是否有新連接,當(dāng)有新連接要建立的時候 ,它創(chuàng)建一個新任務(wù)來處理新連接。
web服務(wù)器最難的部分通常是像讀數(shù)據(jù)這樣的套接字操作是阻塞的。例如PHP將等待到客戶端完成發(fā)送為止。對一個WEB服務(wù)器來說,這 根本不行;這就意味著服務(wù)器在一個時間點上只能處理一個連接。
解決方案是確保在真正對套接字讀寫之前該套接字已經(jīng)“準(zhǔn)備就緒”。為了查找哪個套接字已經(jīng)準(zhǔn)備好讀或者寫了,可以使用 流選擇函數(shù)。
首先,讓我們添加兩個新的 syscall,它們將等待直到指定 socket 準(zhǔn)備好:
<?php
function waitForRead($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForRead($socket, $task);
}
);
}
function waitForWrite($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForWrite($socket, $task);
}
);
}
這些 syscall 只是在調(diào)度器中代理其各自的方法:
<?php
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function waitForRead($socket, Task $task) {
if (isset($this->waitingForRead[(int) $socket])) {
$this->waitingForRead[(int) $socket][1][] = $task;
} else {
$this->waitingForRead[(int) $socket] = [$socket, [$task]];
}
}
public function waitForWrite($socket, Task $task) {
if (isset($this->waitingForWrite[(int) $socket])) {
$this->waitingForWrite[(int) $socket][1][] = $task;
} else {
$this->waitingForWrite[(int) $socket] = [$socket, [$task]];
}
}
waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務(wù)的數(shù)組。有趣的部分在于下面的方法,它將檢查 socket 是否可用,并重新安排各自任務(wù):
<?php
protected function ioPoll($timeout) {
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = []; // dummy
if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $this->waitingForRead[(int) $socket];
unset($this->waitingForRead[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $this->waitingForWrite[(int) $socket];
unset($this->waitingForWrite[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
}
stream_select 函數(shù)接受承載讀取、寫入以及待檢查的socket的數(shù)組(我們無需考慮最后一類)。數(shù)組將按引用傳遞,函數(shù)只會保留那些狀態(tài)改變了的數(shù)組元素。我們可以遍歷這些數(shù)組,并重新安排與之相關(guān)的任務(wù)。
為了正常地執(zhí)行上面的輪詢動作,我們將在調(diào)度器里增加一個特殊的任務(wù):
<?php
protected function ioPollTask() {
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
需要在某個地方注冊這個任務(wù),例如,你可以在run()方法的開始增加$this->newTask($this->ioPollTask())。然后就像其他 任務(wù)一樣每執(zhí)行完整任務(wù)循環(huán)一次就執(zhí)行輪詢操作一次(這么做一定不是最好的方法)。ioPollTask將使用0秒的超時來調(diào)用ioPoll, 這意味著stream_select將立即返回(而不是等待)。
只有任務(wù)隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口準(zhǔn)備就緒。如果我們沒有這么做,那么輪詢?nèi)蝿?wù)將一而再, 再而三的循環(huán)運行,直到有新的連接建立。這將導(dǎo)致100%的CPU利用率。相反,讓操作系統(tǒng)做這種等待會更有效。
現(xiàn)在編寫服務(wù)器相對容易了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
while (true) {
yield waitForRead($socket);
$clientSocket = stream_socket_accept($socket, 0);
yield newTask(handleClient($clientSocket));
}
}
function handleClient($socket) {
yield waitForRead($socket);
$data = fread($socket, 8192);
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield waitForWrite($socket);
fwrite($socket, $response);
fclose($socket);
}
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();
這段代碼將接收到localhost:8000上的連接,然后僅僅返回發(fā)送來的內(nèi)容作為HTTP響應(yīng)。要做“實際”的事情的話就愛哪個非常復(fù)雜(處理 HTTP請求可能已經(jīng)超出了這篇文章的范圍)。上面的代碼片段只是演示了一般性的概念。
你可以使用類似于ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務(wù)器。這條命令將向服務(wù)器發(fā)送10000個請求,并且其中100個請求將同時到達(dá)。使用這樣的數(shù)目,我得到了處于中間的10毫秒的響應(yīng)時間。不過還有一個問題:有少數(shù)幾個請求真正處理的很慢(如5秒), 這就是為什么總吞吐量只有2000請求/秒(如果是10毫秒的響應(yīng)時間的話,總的吞吐量應(yīng)該更像是10000請求/秒)。調(diào)高并發(fā)數(shù)(比如 -c 500),服務(wù)器大多數(shù)運行良好,不過某些連接將拋出“連接被對方重置”的錯誤。由于我對低級別的socket資料了解的非常少,所以 我不能指出問題出在哪兒。
協(xié)程堆棧
如果你試圖用我們的調(diào)度系統(tǒng)建立更大的系統(tǒng)的話,你將很快遇到問題:我們習(xí)慣了把代碼分解為更小的函數(shù),然后調(diào)用它們。然而, 如果使用了協(xié)程的話,就不能這么做了。例如,看下面代碼:
<?php
function echoTimes($msg, $max) {
for ($i = 1; $i <= $max; ++$i) {
echo "$msg iteration $i\n";
yield;
}
}
function task() {
echoTimes('foo', 10); // print foo ten times
echo "---\n";
echoTimes('bar', 5); // print bar five times
yield; // force it to be a coroutine
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
這段代碼試圖把重復(fù)循環(huán)“輸出n次“的代碼嵌入到一個獨立的協(xié)程里,然后從主任務(wù)里調(diào)用它。然而它無法運行。正如在這篇文章的開始 所提到的,調(diào)用生成器(或者協(xié)程)將沒有真正地做任何事情,它僅僅返回一個對象。這也出現(xiàn)在上面的例子里。echoTimes調(diào)用除了放回一個(無用的)協(xié)程對象外不做任何事情。
為了仍然允許這么做,我們需要在這個裸協(xié)程上寫一個小小的封裝。我們將調(diào)用它:“協(xié)程堆棧”。因為它將管理嵌套的協(xié)程調(diào)用堆棧。 這將是通過生成協(xié)程來調(diào)用子協(xié)程成為可能:
$retval = (yield someCoroutine($foo, $bar));
使用yield,子協(xié)程也能再次返回值:
yield retval("I'm a return value!");
retval函數(shù)除了返回一個值的封裝外沒有做任何其他事情。這個封裝將表示它是一個返回值。
<?php
class CoroutineReturnValue {
protected $value;
public function __construct($value) {
$this->value = $value;
}
public function getValue() {
return $this->value;
}
}
function retval($value) {
return new CoroutineReturnValue($value);
}
為了把協(xié)程轉(zhuǎn)變?yōu)閰f(xié)程堆棧(它支持子調(diào)用),我們將不得不編寫另外一個函數(shù)(很明顯,它是另一個協(xié)程):
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
for (;;) {
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
$gen->send(yield $gen->key() => $value);
}
}
這個函數(shù)在調(diào)用者和當(dāng)前正在運行的子協(xié)程之間扮演著簡單代理的角色。在$gen->send(yield $gen->key()=>$value);這行完成了代理功能。另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始運行這個生成器,并把前一個協(xié)程壓入堆棧里。一旦它獲得了CoroutineReturnValue的話,它將再次請求堆棧彈出,然后繼續(xù)執(zhí)行前一個協(xié)程。
為了使協(xié)程堆棧在任務(wù)里可用,任務(wù)構(gòu)造器里的$this-coroutine =$coroutine;這行需要替代為$this->coroutine = StackedCoroutine($coroutine);。
現(xiàn)在我們可以稍微改進上面web服務(wù)器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函數(shù)。為了分組相關(guān)的 功能,我將使用下面類:
<?php
class CoSocket {
protected $socket;
public function __construct($socket) {
$this->socket = $socket;
}
public function accept() {
yield waitForRead($this->socket);
yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
}
public function read($size) {
yield waitForRead($this->socket);
yield retval(fread($this->socket, $size));
}
public function write($string) {
yield waitForWrite($this->socket);
fwrite($this->socket, $string);
}
public function close() {
@fclose($this->socket);
}
}
現(xiàn)在服務(wù)器可以編寫的稍微簡潔點了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
$socket = new CoSocket($socket);
while (true) {
yield newTask(
handleClient(yield $socket->accept())
);
}
}
function handleClient($socket) {
$data = (yield $socket->read(8192));
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield $socket->write($response);
yield $socket->close();
}
錯誤處理
作為一個優(yōu)秀的程序員,相信你已經(jīng)察覺到上面的例子缺少錯誤處理。幾乎所有的 socket 都是易出錯的。我這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket!),另一方面也在于它很容易使代碼體積膨脹。
不過,我仍然了一講一下常見的協(xié)程錯誤處理:協(xié)程允許使用 throw() 方法在其內(nèi)部拋出一個錯誤。盡管此方法還未在 PHP 中實現(xiàn),但我很快就會提交它,就在今天。
throw() 方法接受一個 Exception,并將其拋出到協(xié)程的當(dāng)前懸掛點,看看下面代碼:
<?php
function gen() {
echo "Foo\n";
try {
yield;
} catch (Exception $e) {
echo "Exception: {$e->getMessage()}\n";
}
echo "Bar\n";
}
$gen = gen();
$gen->rewind(); // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
// and "Bar"
這非常棒,因為我們可以使用系統(tǒng)調(diào)用以及子協(xié)程調(diào)用異常拋出。對與系統(tǒng)調(diào)用,Scheduler::run() 方法需要一些小調(diào)整:
<?php
if ($retval instanceof SystemCall) {
try {
$retval($task, $this);
} catch (Exception $e) {
$task->setException($e);
$this->schedule($task);
}
continue;
}
Task 類也許要添加 throw 調(diào)用處理:
<?php
class Task {
// ...
protected $exception = null;
public function setException($exception) {
$this->exception = $exception;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} elseif ($this->exception) {
$retval = $this->coroutine->throw($this->exception);
$this->exception = null;
return $retval;
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
// ...
}
現(xiàn)在,我們已經(jīng)可以在系統(tǒng)調(diào)用中使用異常拋出了!例如,要調(diào)用 killTask,讓我們在傳遞 ID 不可用時拋出一個異常:
<?php
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
if ($scheduler->killTask($tid)) {
$scheduler->schedule($task);
} else {
throw new InvalidArgumentException('Invalid task ID!');
}
}
);
}
試試看:
<?php
function task() {
try {
yield killTask(500);
} catch (Exception $e) {
echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
}
}
這些代碼現(xiàn)在尚不能正常運作,因為 stackedCoroutine 函數(shù)無法正確處理異常。要修復(fù)需要做些調(diào)整:
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
$exception = null;
for (;;) {
try {
if ($exception) {
$gen->throw($exception);
$exception = null;
continue;
}
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
try {
$sendValue = (yield $gen->key() => $value);
} catch (Exception $e) {
$gen->throw($e);
continue;
}
$gen->send($sendValue);
} catch (Exception $e) {
if ($stack->isEmpty()) {
throw $e;
}
$gen = $stack->pop();
$exception = $e;
}
}
}
結(jié)束語
在這篇文章里,我使用多任務(wù)協(xié)作構(gòu)建了一個任務(wù)調(diào)度器,其中包括執(zhí)行“系統(tǒng)調(diào)用”,做非阻塞操作和處理錯誤。所有這些里真正很酷的事情是任務(wù)的結(jié)果代碼看起來完全同步,甚至任務(wù)正在執(zhí)行大量的異步操作的時候也是這樣。如果你打算從套接口讀取數(shù)據(jù)的話,你將不需要傳遞某個回調(diào)函數(shù)或者注冊一個事件。相反,你只要書寫yield $socket->read()。這兒大部分都是你常常也要編寫的,只在它的前面增加yield。
當(dāng)我第一次聽到所有這一切的時候,我發(fā)現(xiàn)這個概念完全令人折服,而且正是這個激勵我在PHP中實現(xiàn)了它。同時我發(fā)現(xiàn)令人心慌。在令人敬畏的代碼和很大一堆代碼之間只有單薄的一行,我認(rèn)為協(xié)程正好處在這一行上。講講使用上面所述的方法書寫異步代碼是否真的有益對我來說很難。
無論如何,我認(rèn)為這是一個有趣的話題,而且我希望你也能找到它的樂趣。歡迎評論:)