欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Laravel中schedule調(diào)度的運(yùn)行機(jī)制

 更新時(shí)間:2022年01月14日 11:38:38   作者:YanChen11  
本文主要介紹了Laravel中schedule調(diào)度的運(yùn)行機(jī)制,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

Laravel 的 console 命令行極大的方便了 PHP 定時(shí)任務(wù)的設(shè)置以及運(yùn)行。以往通過 crontab 配置定時(shí)任務(wù)過程相對(duì)比較繁瑣,并且通過 crontab 設(shè)置的定時(shí)任務(wù)很難防止任務(wù)的交疊運(yùn)行。

所謂任務(wù)的交疊運(yùn)行,是指由于定時(shí)任務(wù)運(yùn)行時(shí)間較長,在 crontab 設(shè)置的運(yùn)行周期不盡合理的情況下,已經(jīng)啟動(dòng)的任務(wù)還沒有結(jié)束運(yùn)行,而系統(tǒng)又啟動(dòng)了新的任務(wù)去執(zhí)行相同的操作。如果程序內(nèi)部沒有處理好數(shù)據(jù)一致性的問題,那么兩個(gè)任務(wù)同時(shí)操作同一份數(shù)據(jù),很可能會(huì)導(dǎo)致嚴(yán)重的后果。

⒈ runInBackground 和 withoutOverlapping

為了防止任務(wù)的交疊運(yùn)行,Laravel 提供了 withoutOverlapping() 方法;為了能讓多任務(wù)在后臺(tái)并行執(zhí)行,Laravel 提供了 runInBackground() 方法。

⑴ runInBackground() 方法

console 命令行中的每一個(gè)命令都代表一個(gè) Event ,\App\Console\Kernel 中的 schedule() 方法的作用只是將這些命令行代表的 Event 注冊(cè)到 Illuminate\Console\Scheduling\Schedule 的屬性 $events 中。

// namespace \Illuminate\Console\Scheduling\Schedule

public function command($command, array $parameters = [])
{
? ? if (class_exists($command)) {
? ? ? ? $command = Container::getInstance()->make($command)->getName();
? ? }

? ? return $this->exec(
? ? ? ? Application::formatCommandString($command), $parameters
? ? );
}

public function exec($command, array $parameters = [])
{
? ? if (count($parameters)) {
? ? ? ? $command .= ' '.$this->compileParameters($parameters);
? ? }

? ? $this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone);

? ? return $event;
}

Event 的運(yùn)行方式有兩種:Foreground 和 Background 。二者的區(qū)別就在于多個(gè) Event 是否可以并行執(zhí)行。Event 默認(rèn)以 Foreground 的方式運(yùn)行,在這種運(yùn)行方式下,多個(gè) Event 順序執(zhí)行,后面的 Event 需要等到前面的 Event 運(yùn)行完成之后才能開始執(zhí)行。

但在實(shí)際應(yīng)用中,我們往往是希望多個(gè) Event 可以并行執(zhí)行,此時(shí)就需要調(diào)用 Event 的 runInBackground() 方法將其運(yùn)行方式設(shè)置為 Background 。
Laravel 框架對(duì)這兩種運(yùn)行方式的處理區(qū)別在于命令行的組裝方式和回調(diào)方法的調(diào)用方式。

// namespace \Illuminate\Console\Scheduling\Event
protected function runCommandInForeground(Container $container)
{
? ? $this->callBeforeCallbacks($container);

? ? $this->exitCode = Process::fromShellCommandline($this->buildCommand(), base_path(), null, null, null)->run();

? ? $this->callAfterCallbacks($container);
}

protected function runCommandInBackground(Container $container)
{
? ? $this->callBeforeCallbacks($container);

? ? Process::fromShellCommandline($this->buildCommand(), base_path(), null, null, null)->run();
}

public function buildCommand()
{
? ? return (new CommandBuilder)->buildCommand($this);
}

// namespace Illuminate\Console\Scheduling\CommandBuilder
public function buildCommand(Event $event)
{
? ? if ($event->runInBackground) {
? ? ? ? return $this->buildBackgroundCommand($event);
? ? }

? ? return $this->buildForegroundCommand($event);
}

protected function buildForegroundCommand(Event $event)
{
? ? $output = ProcessUtils::escapeArgument($event->output);

? ? return $this->ensureCorrectUser(
? ? ? ? $event, $event->command.($event->shouldAppendOutput ? ' >> ' : ' > ').$output.' 2>&1'
? ? );
}

protected function buildBackgroundCommand(Event $event)
{
? ? $output = ProcessUtils::escapeArgument($event->output);

? ? $redirect = $event->shouldAppendOutput ? ' >> ' : ' > ';

? ? $finished = Application::formatCommandString('schedule:finish').' "'.$event->mutexName().'"';

? ? if (windows_os()) {
? ? ? ? return 'start /b cmd /c "('.$event->command.' & '.$finished.' "%errorlevel%")'.$redirect.$output.' 2>&1"';
? ? }

? ? return $this->ensureCorrectUser($event,
? ? ? ? '('.$event->command.$redirect.$output.' 2>&1 ; '.$finished.' "$?") > '
? ? ? ? .ProcessUtils::escapeArgument($event->getDefaultOutput()).' 2>&1 &'
? ? );
}

從代碼中可以看出,采用 Background 方式運(yùn)行的 Event ,其命令行在組裝的時(shí)候結(jié)尾會(huì)增加一個(gè) & 符號(hào),其作用是使命令行程序進(jìn)入后臺(tái)運(yùn)行;另外,采用 Foreground 方式運(yùn)行的 Event ,其回調(diào)方法是同步調(diào)用的,而采用 Background 方式運(yùn)行的 Event ,其 after 回調(diào)則是通過 schedule:finish 命令行來執(zhí)行的。

⑵ withoutOverlapping() 方法

在設(shè)置 Event 的運(yùn)行周期時(shí),由于應(yīng)用場景的不斷變化,很難避免某個(gè)特定的 Event 在某個(gè)時(shí)間段內(nèi)需要運(yùn)行較長的時(shí)間才能完成,甚至在下一個(gè)運(yùn)行周期開始時(shí)還沒有執(zhí)行完成。如果不對(duì)這種情況進(jìn)行處理,就會(huì)導(dǎo)致多個(gè)相同的 Event 同時(shí)運(yùn)行,而如果這些 Event 當(dāng)中涉及到對(duì)數(shù)據(jù)的操作并且程序中沒有處理好冪等問題,很可能會(huì)造成嚴(yán)重后果。
為了避免出現(xiàn)上述的問題,Event 中提供了 withoutOverlapping() 方法,該方法通過將 Event 的 withoutOverlapping 屬性設(shè)置為 TRUE ,在每次要執(zhí)行 Event 時(shí)會(huì)檢查當(dāng)前是否存在正在執(zhí)行的相同的 Event ,如果存在,則不執(zhí)行新的 Event 任務(wù)。

// namespace Illuminate\Console\Scheduling\Event
public function withoutOverlapping($expiresAt = 1440)
{
? ? $this->withoutOverlapping = true;

? ? $this->expiresAt = $expiresAt;

? ? return $this->then(function () {
? ? ? ? $this->mutex->forget($this);
? ? })->skip(function () {
? ? ? ? return $this->mutex->exists($this);
? ? });
}

public function run(Container $container)
{
? ? if ($this->withoutOverlapping &&
? ? ? ? ! $this->mutex->create($this)) {
? ? ? ? return;
? ? }

? ? $this->runInBackground
? ? ? ? ? ? ? ? ? $this->runCommandInBackground($container)
? ? ? ? ? ? ? ? : $this->runCommandInForeground($container);
}

⒉ mutex 互斥鎖

在調(diào)用 withoutOverlapping() 方法時(shí),該方法還實(shí)現(xiàn)了另外兩個(gè)功能:一個(gè)是設(shè)置超時(shí)時(shí)間,默認(rèn)為 24 小時(shí);另一個(gè)是設(shè)置 Event 的回調(diào)。

⑴ 超時(shí)時(shí)間

首先說超時(shí)時(shí)間,這個(gè)超時(shí)時(shí)間并不是 Event 的超時(shí)時(shí)間,而是 Event 的屬性 mutex 的超時(shí)時(shí)間。在向 Illuminate\Console\Scheduling\Schedule 的屬性 $events 中注冊(cè) Event 時(shí),會(huì)調(diào)用 Schedule 中的 exec() 方法,在該方法中會(huì)新建 Event 對(duì)象,此時(shí)會(huì)向 Event 的構(gòu)造方法中傳入一個(gè) eventMutex ,這就是 Event 對(duì)象中的屬性 mutex ,超時(shí)時(shí)間就是為這個(gè) mutex 設(shè)置的。而 Schedule 中的 eventMutex 則是通過實(shí)例化 CacheEventMutex 來創(chuàng)建的。

// namespace \Illuminate\Console\Scheduling\Schedule
$this->eventMutex = $container->bound(EventMutex::class)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $container->make(EventMutex::class)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? : $container->make(CacheEventMutex::class);

設(shè)置了 withoutOverlapping 的 Event 在執(zhí)行之前,首先會(huì)嘗試獲取 mutex 互斥鎖,如果無法成功獲取到鎖,那么 Event 就不會(huì)執(zhí)行。獲取互斥鎖的操作通過調(diào)用 mutex 的 create() 方法完成。
CacheEventMutex 在實(shí)例化時(shí)需要傳入一個(gè) \Illuminate\Contracts\Cache\Factory 類型的實(shí)例,其最終傳入的是一個(gè) \Illuminate\Cache\CacheManager 實(shí)例。在調(diào)用 create() 方法獲取互斥鎖時(shí),還需要通過調(diào)用 store() 方法設(shè)置存儲(chǔ)引擎。

// namespace \Illuminate\Foundation\Console\Kernel
protected function defineConsoleSchedule()
{
? ? $this->app->singleton(Schedule::class, function ($app) {
? ? ? ? return tap(new Schedule($this->scheduleTimezone()), function ($schedule) {
? ? ? ? ? ? $this->schedule($schedule->useCache($this->scheduleCache()));
? ? ? ? });
? ? });
}

protected function scheduleCache()
{
? ? return Env::get('SCHEDULE_CACHE_DRIVER');
}

// namespace \Illuminate\Console\Scheduling\Schedule
public function useCache($store)
{
? ? if ($this->eventMutex instanceof CacheEventMutex) {
? ? ? ? $this->eventMutex->useStore($store);
? ? }

? ? /* ... ... */
? ? return $this;
}

// namespace \Illuminate\Console\Scheduling\CacheEventMutex
public function create(Event $event)
{
? ? return $this->cache->store($this->store)->add(
? ? ? ? $event->mutexName(), true, $event->expiresAt * 60
? ? );
}

// namespace \Illuminate\Cache\CacheManager
public function store($name = null)
{
? ? $name = $name ?: $this->getDefaultDriver();

? ? return $this->stores[$name] = $this->get($name);
}

public function getDefaultDriver()
{
? ? return $this->app['config']['cache.default'];
}

protected function get($name)
{
? ? return $this->stores[$name] ?? $this->resolve($name);
}

protected function resolve($name)
{
? ? $config = $this->getConfig($name);

? ? if (is_null($config)) {
? ? ? ? throw new InvalidArgumentException("Cache store [{$name}] is not defined.");
? ? }

? ? if (isset($this->customCreators[$config['driver']])) {
? ? ? ? return $this->callCustomCreator($config);
? ? } else {
? ? ? ? $driverMethod = 'create'.ucfirst($config['driver']).'Driver';

? ? ? ? if (method_exists($this, $driverMethod)) {
? ? ? ? ? ? return $this->{$driverMethod}($config);
? ? ? ? } else {
? ? ? ? ? ? throw new InvalidArgumentException("Driver [{$config['driver']}] is not supported.");
? ? ? ? }
? ? }
}

protected function getConfig($name)
{
? ? return $this->app['config']["cache.stores.{$name}"];
}

protected function createFileDriver(array $config)
{
? ? return $this->repository(new FileStore($this->app['files'], $config['path'], $config['permission'] ?? null));
}

在初始化 Schedule 時(shí)會(huì)指定 eventMutex 的存儲(chǔ)引擎,默認(rèn)為環(huán)境變量中的配置項(xiàng) SCHEDULE_CACHE_DRIVER 的值。但通常這一項(xiàng)配置在環(huán)境變量中并不存在,所以 useCache() 的參數(shù)值為空,進(jìn)而 eventMutex 的 store 屬性值也為空。這樣,在 eventMutex 的 create() 方法中調(diào)用 store() 方法為其設(shè)置存儲(chǔ)引擎時(shí),store() 方法的參數(shù)值也為空。

當(dāng) store() 方法的傳參為空時(shí),會(huì)使用應(yīng)用的默認(rèn)存儲(chǔ)引擎(如果不做任何修改,默認(rèn) cache 的存儲(chǔ)引擎為 file)。之后會(huì)取得默認(rèn)存儲(chǔ)引擎的配置信息(引擎、存儲(chǔ)路徑、連接信息等),然后實(shí)例化存儲(chǔ)引擎。最終,file 存儲(chǔ)引擎實(shí)例化的是 \Illuminate\Cache\FileStore 。

在設(shè)置完存儲(chǔ)引擎之后,緊接著會(huì)調(diào)用 add() 方法獲取互斥鎖。由于 store() 方法返回的是 \Illuminate\Contracts\Cache\Repository 類型的實(shí)例,所以最終調(diào)用的是 Illuminate\Cache\Repository 中的 add() 方法。

// namespace \Illuminate\Cache\Repository
public function add($key, $value, $ttl = null)
{
? ? if ($ttl !== null) {
? ? ? ? if ($this->getSeconds($ttl) <= 0) {
? ? ? ? ? ? return false;
? ? ? ? }

? ? ? ? if (method_exists($this->store, 'add')) {
? ? ? ? ? ? $seconds = $this->getSeconds($ttl);

? ? ? ? ? ? return $this->store->add(
? ? ? ? ? ? ? ? $this->itemKey($key), $value, $seconds
? ? ? ? ? ? );
? ? ? ? }
? ? }

? ? if (is_null($this->get($key))) {
? ? ? ? return $this->put($key, $value, $ttl);
? ? }

? ? return false;
}

public function get($key, $default = null)
{
? ? if (is_array($key)) {
? ? ? ? return $this->many($key);
? ? }

? ? $value = $this->store->get($this->itemKey($key));

? ? if (is_null($value)) {
? ? ? ? $this->event(new CacheMissed($key));

? ? ? ? $value = value($default);
? ? } else {
? ? ? ? $this->event(new CacheHit($key, $value));
? ? }

? ? return $value;
}

// namespace \Illuminate\Cache\FileStore
public function get($key)
{
? ? return $this->getPayload($key)['data'] ?? null;
}

protected function getPayload($key)
{
? ? $path = $this->path($key);

? ? try {
? ? ? ? $expire = substr(
? ? ? ? ? ? $contents = $this->files->get($path, true), 0, 10
? ? ? ? );
? ? } catch (Exception $e) {
? ? ? ? return $this->emptyPayload();
? ? }

? ? if ($this->currentTime() >= $expire) {
? ? ? ? $this->forget($key);

? ? ? ? return $this->emptyPayload();
? ? }

? ? try {
? ? ? ? $data = unserialize(substr($contents, 10));
? ? } catch (Exception $e) {
? ? ? ? $this->forget($key);

? ? ? ? return $this->emptyPayload();
? ? }

? ? $time = $expire - $this->currentTime();

? ? return compact('data', 'time');
}

這里需要說明,所謂互斥鎖,其本質(zhì)是寫文件。如果文件不存在或文件內(nèi)容為空或文件中存儲(chǔ)的過期時(shí)間小于當(dāng)前時(shí)間,則互斥鎖可以順利獲得;否則無法獲取到互斥鎖。文件內(nèi)容為固定格式:timestampb:1 。

所謂超時(shí)時(shí)間,與此處的 timestamp 的值有密切的聯(lián)系。獲取互斥鎖時(shí)的時(shí)間戳,再加上超時(shí)時(shí)間的秒數(shù),即是此處的 timestamp 的值。

由于 FileStore 中不存在 add() 方法,所以程序會(huì)直接嘗試調(diào)用 get() 方法獲取文件中的內(nèi)容。如果 get() 返回的結(jié)果為 NULL,說明獲取互斥鎖成功,之后會(huì)調(diào)用 FileStore 的 put() 方法寫文件;否則,說明當(dāng)前有相同的 Event 在運(yùn)行,不會(huì)再運(yùn)行新的 Event 。
在調(diào)用 put() 方法寫文件時(shí),首先需要根據(jù)傳參計(jì)算 eventMutex 的超時(shí)時(shí)間的秒數(shù),之后再調(diào)用 FileStore 中的 put() 方法,將數(shù)據(jù)寫入文件中。

// namespace \Illuminate\Cache\Repository
public function put($key, $value, $ttl = null)
{
? ? /* ... ... */

? ? $seconds = $this->getSeconds($ttl);

? ? if ($seconds <= 0) {
? ? ? ? return $this->forget($key);
? ? }

? ? $result = $this->store->put($this->itemKey($key), $value, $seconds);

? ? if ($result) {
? ? ? ? $this->event(new KeyWritten($key, $value, $seconds));
? ? }

? ? return $result;
}

// namespace \Illuminate\Cache\FileStore
public function put($key, $value, $seconds)
{
? ? $this->ensureCacheDirectoryExists($path = $this->path($key));

? ? $result = $this->files->put(
? ? ? ? $path, $this->expiration($seconds).serialize($value), true
? ? );

? ? if ($result !== false && $result > 0) {
? ? ? ? $this->ensureFileHasCorrectPermissions($path);

? ? ? ? return true;
? ? }

? ? return false;
}

protected function path($key)
{
? ? $parts = array_slice(str_split($hash = sha1($key), 2), 0, 2);

? ? return $this->directory.'/'.implode('/', $parts).'/'.$hash;
}

// namespace \Illuminate\Console\Scheduling\Schedule
public function mutexName()
{
? ? return 'framework'.DIRECTORY_SEPARATOR.'schedule-'.sha1($this->expression.$this->command);
}

這里需要重點(diǎn)說明的是 $key 的生成方法以及文件路徑的生成方法。$key 通過調(diào)用 Event 的 mutexName() 方法生成,其中需要用到 Event 的 $expression 和 $command 屬性。其中 $command 為我們定義的命令行,在調(diào)用 $schedule->comand() 方法時(shí)傳入,然后進(jìn)行格式化,$expression 則為 Event 的運(yùn)行周期。

以命令行 schedule:test 為例,格式化之后的命令行為  `/usr/local/php/bin/php` `artisan` schedule:test,如果該命令行設(shè)置的運(yùn)行周期為每分鐘一次,即 * * * * * ,則最終計(jì)算得到的 $key 的值為 framework/schedule-768a42da74f005b3ac29ca0a88eb72d0ca2b84be 。文件路徑則是將 $key 的值再次進(jìn)行 sha1 計(jì)算之后,以兩個(gè)字符為一組切分成數(shù)組,然后取數(shù)組的前兩項(xiàng)組成一個(gè)二級(jí)目錄,而配置文件中 file 引擎的默認(rèn)存儲(chǔ)路徑為 storage/framework/cache/data ,所以最終的文件路徑為 storage/framework/cache/data/eb/60/eb608bf555895f742e5bd57e186cbd97f9a6f432 。而文件中存儲(chǔ)的內(nèi)容則為 1642122685b:1 。

⑵ 回調(diào)方法

再來說設(shè)置的 Event 回調(diào),調(diào)用 withoutOverlapping() 方法會(huì)為 Event 設(shè)置兩個(gè)回調(diào):一個(gè)是 Event 運(yùn)行完成之后的回調(diào),用于釋放互斥鎖,即清理緩存文件;另一個(gè)是在運(yùn)行 Event 之前判斷互斥鎖是否被占用,即緩存文件是否已經(jīng)存在。

無論 Event 是以 Foreground 的方式運(yùn)行,還是以 Background 的方式運(yùn)行,在運(yùn)行完成之后都會(huì)調(diào)用 callAfterCallbacks() 方法執(zhí)行 afterCallbacks 中的回調(diào),其中就有一項(xiàng)回調(diào)用于釋放互斥鎖,刪除緩存文件 $this->mutex->forget($this) 。區(qū)別就在于,以 Foreground 方式運(yùn)行的 Event 是在運(yùn)行完成之后顯式的調(diào)用這些回調(diào)方法,而以 Background 方式運(yùn)行的 Event 則需要借助 schedule:finish 來調(diào)用這些回調(diào)方法。
所有在 \App\Console\Kernel 中注冊(cè) Event,都是通過命令行 schedule:run 來調(diào)度的。在調(diào)度之前,首先會(huì)判斷當(dāng)前時(shí)間點(diǎn)是否滿足各個(gè) Event 所配置的運(yùn)行周期的要求。如果滿足的話,接下來就是一些過濾條件的判斷,這其中就包括判斷互斥鎖是否被占用。只有在互斥鎖沒有被占用的情況下,Event 才可以運(yùn)行。

// namespace \Illuminate\Console\Scheduling\ScheduleRunCommand
public function handle(Schedule $schedule, Dispatcher $dispatcher)
{
? ? $this->schedule = $schedule;
? ? $this->dispatcher = $dispatcher;

? ? foreach ($this->schedule->dueEvents($this->laravel) as $event) {
? ? ? ? if (! $event->filtersPass($this->laravel)) {
? ? ? ? ? ? $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

? ? ? ? ? ? continue;
? ? ? ? }

? ? ? ? if ($event->onOneServer) {
? ? ? ? ? ? $this->runSingleServerEvent($event);
? ? ? ? } else {
? ? ? ? ? ? $this->runEvent($event);
? ? ? ? }

? ? ? ? $this->eventsRan = true;
? ? }

? ? if (! $this->eventsRan) {
? ? ? ? $this->info('No scheduled commands are ready to run.');
? ? }
}

// namespace \Illuminate\Console\Scheduling\Schedule
public function dueEvents($app)
{
? ? return collect($this->events)->filter->isDue($app);
}

// namespace \Illuminate\Console\Scheduling\Event
public function isDue($app)
{
? ? /* ... ... */
? ? return $this->expressionPasses() &&
? ? ? ? ? ?$this->runsInEnvironment($app->environment());
}

protected function expressionPasses()
{
? ? $date = Carbon::now();
? ? /* ... ... */
? ? return CronExpression::factory($this->expression)->isDue($date->toDateTimeString());
}

// namespace \Cron\CronExpression
public function isDue($currentTime = 'now', $timeZone = null)
{
? ?/* ... ... */
? ?
? ? try {
? ? ? ? return $this->getNextRunDate($currentTime, 0, true)->getTimestamp() === $currentTime->getTimestamp();
? ? } catch (Exception $e) {
? ? ? ? return false;
? ? }
}

public function getNextRunDate($currentTime = 'now', $nth = 0, $allowCurrentDate = false, $timeZone = null)
{
? ? return $this->getRunDate($currentTime, $nth, false, $allowCurrentDate, $timeZone);
}

有時(shí)候,我們可能需要 kill 掉一些在后臺(tái)運(yùn)行的命令行,但緊接著我們會(huì)發(fā)現(xiàn)這些被 kill 掉的命令行在一段時(shí)間內(nèi)無法按照設(shè)置的運(yùn)行周期自動(dòng)調(diào)度,其原因就在于手動(dòng) kill 掉的命令行沒有調(diào)用 schedule:finish 清理緩存文件,釋放互斥鎖。這就導(dǎo)致在設(shè)置的過期時(shí)間到達(dá)之前,互斥鎖會(huì)一直被占用,新的 Event 不會(huì)再次運(yùn)行。

到此這篇關(guān)于Laravel中schedule調(diào)度的運(yùn)行機(jī)制的文章就介紹到這了,更多相關(guān)Laravel schedule調(diào)度內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論