From 0fa281247d9707bc448ebb853f9051eba3e4304e Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Wed, 21 Sep 2022 21:03:07 +0200 Subject: [PATCH 1/5] More cooldown calls for worker processes --- bin/daemon.php | 5 ++- src/Core/Worker.php | 62 +++++++++++++++++++-------- src/Protocol/ActivityPub/Delivery.php | 2 + src/Worker/Notifier.php | 4 ++ src/Worker/PollContacts.php | 1 + src/Worker/UpdateContacts.php | 1 + src/Worker/UpdateGServers.php | 1 + src/Worker/UpdateServerPeers.php | 1 + 8 files changed, 58 insertions(+), 19 deletions(-) diff --git a/bin/daemon.php b/bin/daemon.php index 880115d0d..f1eb077ad 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -76,8 +76,8 @@ DI::config()->load(); if (empty(DI::config()->get('system', 'pidfile'))) { die(<< [ + + 'system' => [ 'pidfile' => '/path/to/daemon.pid', ], TXT @@ -199,6 +199,7 @@ while (true) { } if ($do_cron || (!DI::system()->isMaxLoadReached() && Worker::entriesExists() && Worker::isReady())) { + Worker::coolDown(); Worker::spawnWorker($do_cron); } else { Logger::info('Cool down for 5 seconds', ['pid' => $pid]); diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 830e02983..4496caf92 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -444,6 +444,47 @@ class Worker return true; } + /** + * Slow the execution down if the system load is too high + * + * @return void + */ + public static function coolDown() + { + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); + $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); + + if (($load_cooldown == 0) && ($processes_cooldown == 0)) { + return; + } + + $sleeping = false; + + while ($load = System::getLoadAvg()) { + if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { + if (!$sleeping) { + Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + $sleeping = true; + } + sleep(1); + continue; + } + if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { + if (!$sleeping) { + Logger::notice('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + $sleeping = true; + } + sleep(1); + continue; + } + break; + } + + if ($sleeping) { + Logger::notice('Cooldown ended.', ['max-load' => $load_cooldown, 'max-processes' => $processes_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); + } + } + /** * Execute a function from the queue * @@ -460,26 +501,11 @@ class Worker $cooldown = DI::config()->get('system', 'worker_cooldown', 0); if ($cooldown > 0) { - Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); + Logger::notice('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); sleep($cooldown); } - $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); - $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); - - while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) { - if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { - Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); - sleep(1); - continue; - } - if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { - Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); - sleep(1); - continue; - } - break; - } + self::coolDown(); Logger::enableWorker($funcname); @@ -527,6 +553,8 @@ class Worker Logger::info('Performance:', ['state' => self::$state, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]); + self::coolDown(); + self::$up_start = microtime(true); self::$db_duration = 0; self::$db_duration_count = 0; diff --git a/src/Protocol/ActivityPub/Delivery.php b/src/Protocol/ActivityPub/Delivery.php index 9aa45d041..49a2b5aa4 100644 --- a/src/Protocol/ActivityPub/Delivery.php +++ b/src/Protocol/ActivityPub/Delivery.php @@ -22,6 +22,7 @@ namespace Friendica\Protocol\ActivityPub; use Friendica\Core\Logger; +use Friendica\Core\Worker; use Friendica\Database\DBA; use Friendica\DI; use Friendica\Model\Contact; @@ -55,6 +56,7 @@ class Delivery Logger::notice('Inbox delivery has a server failure', ['inbox' => $inbox]); $serverfail = true; } + Worker::coolDown(); } if ($serverfail || (!$result['success'] && !$result['drop'])) { diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 7921097d4..b236abc19 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -573,6 +573,7 @@ class Notifier if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) { $delivery_queue_count++; } + Worker::coolDown(); } return $delivery_queue_count; } @@ -695,6 +696,7 @@ class Notifier Logger::info('Account removal via ActivityPub', ['uid' => $self_user_id, 'inbox' => $inbox]); Worker::add(['priority' => PRIORITY_NEGLIGIBLE, 'created' => $created, 'dont_fork' => true], 'APDelivery', Delivery::REMOVAL, 0, $inbox, $self_user_id, $receivers); + Worker::coolDown(); } return true; @@ -818,6 +820,7 @@ class Notifier $delivery_queue_count++; } } + Worker::coolDown(); } // We deliver posts to relay servers slightly delayed to priorize the direct delivery @@ -833,6 +836,7 @@ class Notifier $delivery_queue_count++; } } + Worker::coolDown(); } return ['count' => $delivery_queue_count, 'contacts' => $contacts]; diff --git a/src/Worker/PollContacts.php b/src/Worker/PollContacts.php index b61f5aebb..b4312ef0c 100644 --- a/src/Worker/PollContacts.php +++ b/src/Worker/PollContacts.php @@ -81,6 +81,7 @@ class PollContacts Logger::notice("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact['priority'] . " " . $contact["nick"] . " " . $contact["name"]); Worker::add(['priority' => $priority, 'dont_fork' => true, 'force_priority' => true], 'OnePoll', (int)$contact['id']); + Worker::coolDown(); } DBA::close($contacts); } diff --git a/src/Worker/UpdateContacts.php b/src/Worker/UpdateContacts.php index 31426e79f..7e352a27c 100644 --- a/src/Worker/UpdateContacts.php +++ b/src/Worker/UpdateContacts.php @@ -62,6 +62,7 @@ class UpdateContacts if (Worker::add(['priority' => PRIORITY_LOW, 'dont_fork' => true], "UpdateContact", $contact['id'])) { ++$count; } + Worker::coolDown(); } DBA::close($contacts); diff --git a/src/Worker/UpdateGServers.php b/src/Worker/UpdateGServers.php index e53754584..108482eae 100644 --- a/src/Worker/UpdateGServers.php +++ b/src/Worker/UpdateGServers.php @@ -72,6 +72,7 @@ class UpdateGServers $count++; } } + Worker::coolDown(); } DBA::close($gservers); Logger::info('Updated servers', ['count' => $count]); diff --git a/src/Worker/UpdateServerPeers.php b/src/Worker/UpdateServerPeers.php index 09c88499e..8c916504e 100644 --- a/src/Worker/UpdateServerPeers.php +++ b/src/Worker/UpdateServerPeers.php @@ -63,6 +63,7 @@ class UpdateServerPeers // This endpoint doesn't offer the schema. So we assume that it is HTTPS. GServer::add('https://' . $peer); ++$added; + Worker::coolDown(); } Logger::info('Server peer update ended', ['total' => $total, 'added' => $added, 'url' => $url]); } From f7d054674d66dabaf980469b48f0215b6a4d9e10 Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Wed, 21 Sep 2022 22:44:48 +0200 Subject: [PATCH 2/5] Don't perform a cooldown in the daemon by now --- bin/daemon.php | 1 - 1 file changed, 1 deletion(-) diff --git a/bin/daemon.php b/bin/daemon.php index f1eb077ad..ef5886c0b 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -199,7 +199,6 @@ while (true) { } if ($do_cron || (!DI::system()->isMaxLoadReached() && Worker::entriesExists() && Worker::isReady())) { - Worker::coolDown(); Worker::spawnWorker($do_cron); } else { Logger::info('Cool down for 5 seconds', ['pid' => $pid]); From 30eb511da3098785f6a2bbe188b3be26d907467e Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 22 Sep 2022 04:33:00 +0000 Subject: [PATCH 3/5] Reverting changes --- bin/daemon.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/daemon.php b/bin/daemon.php index ef5886c0b..880115d0d 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -76,8 +76,8 @@ DI::config()->load(); if (empty(DI::config()->get('system', 'pidfile'))) { die(<< [ + + 'system' => [ 'pidfile' => '/path/to/daemon.pid', ], TXT From e1341b6ad3b51ee849217cba8c1326e4aaec39b8 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 22 Sep 2022 05:45:42 +0000 Subject: [PATCH 4/5] Fallback for load detection, additional checks --- src/Core/System.php | 19 +++++++++++++------ src/Core/Worker.php | 36 +++++++++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/Core/System.php b/src/Core/System.php index 55b4bb8cf..0c1e499d9 100644 --- a/src/Core/System.php +++ b/src/Core/System.php @@ -442,17 +442,24 @@ class System */ public static function getLoadAvg(): array { - $content = file_get_contents('/proc/loadavg'); + $content = @file_get_contents('/proc/loadavg'); if (empty($content)) { $content = shell_exec('cat /proc/loadavg'); } - if (empty($content)) { - return []; + if (empty($content) || !preg_match("#([.\d]+)\s([.\d]+)\s([.\d]+)\s(\d+)/(\d+)#", $content, $matches)) { + $load_arr = sys_getloadavg(); + if (empty($load_arr)) { + return []; + } + return [ + 'average1' => $load_arr[0], + 'average5' => $load_arr[1], + 'average15' => $load_arr[2], + 'runnable' => 0, + 'scheduled' => 0 + ]; } - if (!preg_match("#([.\d]+)\s([.\d]+)\s([.\d]+)\s(\d+)/(\d+)#", $content, $matches)) { - return []; - } return [ 'average1' => (float)$matches[1], 'average5' => (float)$matches[2], diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 4496caf92..6bf6168b2 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -143,7 +143,7 @@ class Worker } // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60)) && !self::systemLimitReached()) { Logger::info('Process lifetime reached, respawning.'); self::unclaimProcess($process); if (Worker\Daemon::isMode()) { @@ -444,6 +444,36 @@ class Worker return true; } + /** + * Checks if system limits are reached. + * + * @return boolean + */ + private static function systemLimitReached(): bool + { + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); + $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); + + if (($load_cooldown == 0) && ($processes_cooldown == 0)) { + return false; + } + + $load = System::getLoadAvg(); + if (empty($load)) { + return false; + } + + if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { + return true; + } + + if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) { + return true; + } + + return false; + } + /** * Slow the execution down if the system load is too high * @@ -772,7 +802,7 @@ class Worker Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues); // Are there fewer workers running as possible? Then fork a new one. - if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) { + if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists() && !self::systemLimitReached()) { Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]); if (Worker\Daemon::isMode()) { Worker\IPC::SetJobState(true); @@ -1250,7 +1280,7 @@ class Worker Worker\Daemon::checkState(); // Should we quit and wait for the worker to be called as a cronjob? - if ($dont_fork) { + if ($dont_fork || self::systemLimitReached()) { return $added; } From 97e5477bde1b4d88cca3eec6f35c4be855f576f1 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 22 Sep 2022 22:46:59 +0000 Subject: [PATCH 5/5] static cooldown moved to the cooldown function --- src/Core/Worker.php | 21 ++++++++++----------- static/defaults.config.php | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 6bf6168b2..036f065d4 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -481,6 +481,16 @@ class Worker */ public static function coolDown() { + $cooldown = DI::config()->get('system', 'worker_cooldown', 0); + if ($cooldown > 0) { + Logger::debug('Wait for cooldown.', ['cooldown' => $cooldown]); + if ($cooldown < 1) { + usleep($cooldown * 1000000); + } else { + sleep($cooldown); + } + } + $load_cooldown = DI::config()->get('system', 'worker_load_cooldown'); $processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown'); @@ -529,12 +539,6 @@ class Worker { $a = DI::app(); - $cooldown = DI::config()->get('system', 'worker_cooldown', 0); - if ($cooldown > 0) { - Logger::notice('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]); - sleep($cooldown); - } - self::coolDown(); Logger::enableWorker($funcname); @@ -605,11 +609,6 @@ class Worker Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]); DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname); - - if ($cooldown > 0) { - Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]); - sleep($cooldown); - } } /** diff --git a/static/defaults.config.php b/static/defaults.config.php index fb6727491..b02f53774 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -615,7 +615,7 @@ return [ // Minimum for this config value is 1. Maximum is 64 as the resulting profile URL mustn't be longer than 255 chars. 'username_max_length' => 48, - // worker_cooldown (Integer) + // worker_cooldown (Float) // Cooldown period in seconds before each worker function call. 'worker_cooldown' => 0,