diff --git a/config/defaults.config.php b/config/defaults.config.php index 18859ee10..24a40fae2 100644 --- a/config/defaults.config.php +++ b/config/defaults.config.php @@ -397,6 +397,10 @@ return [ // If enabled, it prints out the jobs per minute. 'worker_jpm' => false, + // worker_jpm_limit (String) + // List of minutes for the JPM calculation + 'worker_jpm_limit' => '1, 10, 60', + // worker_load_exponent (Integer) // Default 3, which allows only 25% of the maximum worker queues when server load reaches around 37% of maximum load. // For a linear response where 25% of worker queues are allowed at 75% of maximum load, set this to 1. diff --git a/mod/worker.php b/mod/worker.php index 2995775db..af778f8d6 100644 --- a/mod/worker.php +++ b/mod/worker.php @@ -39,8 +39,10 @@ function worker_init() Worker::callWorker(); $passing_slow = false; + $entries = 0; + $deferred = 0; - if ($r = Worker::workerProcess($passing_slow)) { + if ($r = Worker::workerProcess($passing_slow, $entries, $deferred)) { // On most configurations this parameter wouldn't have any effect. // But since it doesn't destroy anything, we just try to get more execution time in any way. set_time_limit(0); diff --git a/src/Core/Worker.php b/src/Core/Worker.php index affcf8442..c8a4c038b 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -22,9 +22,12 @@ use Friendica\Util\Network; class Worker { private static $up_start; - private static $db_duration; + private static $db_duration = 0; + private static $db_duration_count = 0; + private static $db_duration_write = 0; + private static $db_duration_stat = 0; + private static $lock_duration = 0; private static $last_update; - private static $lock_duration; /** * @brief Processes the tasks that are in the workerqueue table @@ -86,8 +89,11 @@ class Worker $starttime = time(); + $entries = 0; + $deferred = 0; + // We fetch the next queue entry that is about to be executed - while ($r = self::workerProcess($passing_slow)) { + while ($r = self::workerProcess($passing_slow, $entries, $deferred)) { // When we are processing jobs with a lower priority, we don't refetch new jobs // Otherwise fast jobs could wait behind slow ones and could be blocked. $refetched = $passing_slow; @@ -107,9 +113,7 @@ class Worker $entries = self::totalEntries(); $deferred = self::deferredEntries(); if (Lock::acquire('worker_process', 0)) { - $stamp = (float)microtime(true); $refetched = self::findWorkerProcesses($passing_slow, $entries, $deferred); - self::$db_duration += (microtime(true) - $stamp); Lock::release('worker_process'); } } @@ -117,9 +121,8 @@ class Worker // To avoid the quitting of multiple workers only one worker at a time will execute the check if (Lock::acquire('worker', 0)) { - $stamp = (float)microtime(true); // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers()) { + if (self::tooMuchWorkers($entries, $deferred)) { Logger::log('Active worker limit reached, quitting.', Logger::DEBUG); Lock::release('worker'); return; @@ -132,7 +135,6 @@ class Worker return; } Lock::release('worker'); - self::$db_duration += (microtime(true) - $stamp); } // Quit the worker once every 5 minutes @@ -149,6 +151,20 @@ class Worker Logger::log("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", Logger::DEBUG); } + /** + * @brief Check if non executed tasks do exist in the worker queue + * + * @return boolean Returns "true" if tasks are existing + * @throws \Exception + */ + private static function entriesExists() + { + $stamp = (float)microtime(true); + $exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]); + self::$db_duration += (microtime(true) - $stamp); + return $exists; + } + /** * @brief Returns the number of deferred entries in the worker queue * @@ -157,7 +173,11 @@ class Worker */ private static function deferredEntries() { - return DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` > ?", DateTimeFormat::utcNow()]); + $stamp = (float)microtime(true); + $count = DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` > ?", DateTimeFormat::utcNow()]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_count += (microtime(true) - $stamp); + return $count; } /** @@ -168,7 +188,11 @@ class Worker */ private static function totalEntries() { - return DBA::count('workerqueue', ['done' => false, 'pid' => 0]); + $stamp = (float)microtime(true); + $count = DBA::count('workerqueue', ['done' => false, 'pid' => 0]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_count += (microtime(true) - $stamp); + return $count; } /** @@ -179,8 +203,10 @@ class Worker */ private static function highestPriority() { + $stamp = (float)microtime(true); $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()]; $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); + self::$db_duration += (microtime(true) - $stamp); if (DBA::isResult($workerqueue)) { return $workerqueue["priority"]; } else { @@ -252,6 +278,7 @@ class Worker $stamp = (float)microtime(true); DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]); self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } array_shift($argv); @@ -259,12 +286,12 @@ class Worker self::execFunction($queue, $include, $argv, true); $stamp = (float)microtime(true); - $condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()]; if (DBA::update('workerqueue', ['done' => true], $condition)) { Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); return true; } @@ -276,7 +303,10 @@ class Worker if (!validate_include($include)) { Logger::log("Include file ".$argv[0]." is not valid!"); + $stamp = (float)microtime(true); DBA::delete('workerqueue', ['id' => $queue["id"]]); + self::$db_duration = (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); return true; } @@ -297,6 +327,7 @@ class Worker $stamp = (float)microtime(true); DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]); self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } self::execFunction($queue, $funcname, $argv, false); @@ -306,9 +337,13 @@ class Worker Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } else { Logger::log("Function ".$funcname." does not exist"); + $stamp = (float)microtime(true); DBA::delete('workerqueue', ['id' => $queue["id"]]); + self::$db_duration = (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } return true; @@ -361,7 +396,7 @@ class Worker $a->process_id = $new_process_id; $a->queue = $queue; - $up_duration = number_format(microtime(true) - self::$up_start, 3); + $up_duration = microtime(true) - self::$up_start; // Reset global data to avoid interferences unset($_SESSION); @@ -377,21 +412,27 @@ class Worker $duration = (microtime(true) - $stamp); - self::$up_start = microtime(true); - /* With these values we can analyze how effective the worker is. * The database and rest time should be low since this is the unproductive time. * The execution time is the productive time. * By changing parameters like the maximum number of workers we can check the effectivness. */ Logger::log( - 'DB: '.number_format(self::$db_duration, 2). - ' - Lock: '.number_format(self::$lock_duration, 2). - ' - Rest: '.number_format($up_duration - self::$db_duration - self::$lock_duration, 2). - ' - Execution: '.number_format($duration, 2), + 'DB: '.number_format(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 4). + ' - DB-Count: '.number_format(self::$db_duration_count, 4). + ' - DB-Stat: '.number_format(self::$db_duration_stat, 4). + ' - DB-Write: '.number_format(self::$db_duration_write, 4). + ' - Lock: '.number_format(self::$lock_duration, 4). + ' - Rest: '.number_format(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 4). + ' - Execution: '.number_format($duration, 4), Logger::DEBUG ); + self::$up_start = microtime(true); + self::$db_duration = 0; + self::$db_duration_count = 0; + self::$db_duration_stat = 0; + self::$db_duration_write = 0; self::$lock_duration = 0; if ($duration > 3600) { @@ -506,7 +547,9 @@ class Worker $max = $r["Value"]; } // Or it can be granted. This overrides the system variable + $stamp = (float)microtime(true); $r = DBA::p('SHOW GRANTS'); + self::$db_duration += (microtime(true) - $stamp); while ($grants = DBA::fetch($r)) { $grant = array_pop($grants); if (stristr($grant, "GRANT USAGE ON")) { @@ -521,7 +564,9 @@ class Worker // If $max is set we will use the processlist to determine the current number of connections // The processlist only shows entries of the current user if ($max != 0) { + $stamp = (float)microtime(true); $r = DBA::p('SHOW PROCESSLIST'); + self::$db_duration += (microtime(true) - $stamp); $used = DBA::numRows($r); DBA::close($r); @@ -571,20 +616,25 @@ class Worker */ private static function killStaleWorkers() { + $stamp = (float)microtime(true); $entries = DBA::select( 'workerqueue', ['id', 'pid', 'executed', 'priority', 'parameter'], ['NOT `done` AND `pid` != 0'], ['order' => ['priority', 'created']] ); + self::$db_duration += (microtime(true) - $stamp); while ($entry = DBA::fetch($entries)) { if (!posix_kill($entry["pid"], 0)) { + $stamp = (float)microtime(true); DBA::update( 'workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['id' => $entry["id"]] ); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } else { // Kill long running processes // Check if the priority is in a valid range @@ -616,11 +666,14 @@ class Worker } elseif ($entry["priority"] != PRIORITY_CRITICAL) { $new_priority = PRIORITY_NEGLIGIBLE; } + $stamp = (float)microtime(true); DBA::update( 'workerqueue', ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0], ['id' => $entry["id"]] ); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } else { Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", Logger::DEBUG); } @@ -631,10 +684,13 @@ class Worker /** * @brief Checks if the number of active workers exceeds the given limits * + * @param integer $entries Total number of queue entries + * @param integer $deferred Number of deferred queue entries + * * @return bool Are there too much workers running? * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function tooMuchWorkers() + public static function tooMuchWorkers($entries = 0, $deferred = 0) { $queues = Config::get("system", "worker_queues", 4); @@ -659,10 +715,13 @@ class Worker $processlist = ''; if (Config::get('system', 'worker_jpm')) { - $intervals = [1, 10, 60]; + $intervals = explode(',', Config::get('system', 'worker_jpm_range')); $jobs_per_minute = []; foreach ($intervals as $interval) { + $stamp = (float)microtime(true); $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_stat += (microtime(true) - $stamp); if ($job = DBA::fetch($jobs)) { $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); } @@ -671,33 +730,51 @@ class Worker $processlist = ' - jpm: '.implode('/', $jobs_per_minute); } + // Create a list of queue entries grouped by their priority + $listitem = [0 => '']; + + $idle_workers = $active; + if (Config::get('system', 'worker_debug')) { - // Create a list of queue entries grouped by their priority - $listitem = [0 => '']; - - $idle_workers = $active; - // Now adding all processes with workerqueue entries - $entries = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`"); - while ($entry = DBA::fetch($entries)) { + $stamp = (float)microtime(true); + $jobs = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` AND `next_try` < ? GROUP BY `priority`", DateTimeFormat::utcNow()); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_stat += (microtime(true) - $stamp); + while ($entry = DBA::fetch($jobs)) { + $stamp = (float)microtime(true); $processes = DBA::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `priority` = ?", $entry["priority"]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_stat += (microtime(true) - $stamp); if ($process = DBA::fetch($processes)) { $idle_workers -= $process["running"]; $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; } DBA::close($processes); } - DBA::close($entries); + DBA::close($jobs); + } else { + $stamp = (float)microtime(true); + $jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority`"); + self::$db_duration += (microtime(true) - $stamp); - $listitem[0] = "0:" . max(0, $idle_workers); - - $processlist .= ' ('.implode(', ', $listitem).')'; + while ($entry = DBA::fetch($jobs)) { + $idle_workers -= $entry["running"]; + $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"]; + } + DBA::close($jobs); } - $deferred = self::deferredEntries(); - $entries = max(self::totalEntries() - $deferred, 0); + $listitem[0] = "0:" . max(0, $idle_workers); - if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) { + $processlist .= ' ('.implode(', ', $listitem).')'; + + if (empty($deferred) && empty($entries)) { + $deferred = self::deferredEntries(); + $entries = max(self::totalEntries() - $deferred, 0); + } + + if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && self::entriesExists() && ($active >= $queues)) { $top_priority = self::highestPriority(); $high_running = self::processWithPriorityActive($top_priority); @@ -707,7 +784,7 @@ class Worker } } - Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $entries . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG); + Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . ($entries - $deferred) . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG); // Are there fewer workers running as possible? Then fork a new one. if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { @@ -736,7 +813,10 @@ class Worker */ private static function activeWorkers() { - return DBA::count('process', ['command' => 'Worker.php']); + $stamp = (float)microtime(true); + $count = DBA::count('process', ['command' => 'Worker.php']); + self::$db_duration += (microtime(true) - $stamp); + return $count; } /** @@ -753,11 +833,13 @@ class Worker { $highest_priority = 0; + $stamp = (float)microtime(true); $r = DBA::p( "SELECT `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`" ); + self::$db_duration += (microtime(true) - $stamp); // No active processes at all? Fine if (!DBA::isResult($r)) { @@ -829,6 +911,7 @@ class Worker $ids = []; if (self::passingSlow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? + $stamp = (float)microtime(true); $result = DBA::select( 'workerqueue', ['id'], @@ -836,6 +919,7 @@ class Worker $highest_priority, DateTimeFormat::utcNow()], ['limit' => $limit, 'order' => ['priority', 'created']] ); + self::$db_duration += (microtime(true) - $stamp); while ($id = DBA::fetch($result)) { $ids[] = $id["id"]; @@ -846,6 +930,7 @@ class Worker if (!$found) { // Give slower processes some processing time + $stamp = (float)microtime(true); $result = DBA::select( 'workerqueue', ['id'], @@ -853,6 +938,7 @@ class Worker $highest_priority, DateTimeFormat::utcNow()], ['limit' => $limit, 'order' => ['priority', 'created']] ); + self::$db_duration += (microtime(true) - $stamp); while ($id = DBA::fetch($result)) { $ids[] = $id["id"]; @@ -866,6 +952,7 @@ class Worker // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { + $stamp = (float)microtime(true); $result = DBA::select( 'workerqueue', ['id'], @@ -873,6 +960,7 @@ class Worker DateTimeFormat::utcNow()], ['limit' => $limit, 'order' => ['priority', 'created']] ); + self::$db_duration += (microtime(true) - $stamp); while ($id = DBA::fetch($result)) { $ids[] = $id["id"]; @@ -883,9 +971,12 @@ class Worker } if ($found) { + $stamp = (float)microtime(true); $condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`"; array_unshift($ids, $condition); DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $ids); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } return $found; @@ -895,40 +986,41 @@ class Worker * @brief Returns the next worker process * * @param boolean $passing_slow Returns if we had passed low priority processes + * @param integer $entries Returns total number of queue entries + * @param integer $deferred Returns number of deferred queue entries + * * @return string SQL statement * @throws \Friendica\Network\HTTPException\InternalServerErrorException */ - public static function workerProcess(&$passing_slow) + public static function workerProcess(&$passing_slow, &$entries, &$deferred) { - $stamp = (float)microtime(true); - // There can already be jobs for us in the queue. + $stamp = (float)microtime(true); $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); + self::$db_duration += (microtime(true) - $stamp); if (DBA::isResult($r)) { - self::$db_duration += (microtime(true) - $stamp); return DBA::toArray($r); } DBA::close($r); - $stamp = (float)microtime(true); - // Counting the rows outside the lock reduces the lock time $entries = self::totalEntries(); $deferred = self::deferredEntries(); + $stamp = (float)microtime(true); if (!Lock::acquire('worker_process')) { return false; } - self::$lock_duration = (microtime(true) - $stamp); + self::$lock_duration += (microtime(true) - $stamp); - $stamp = (float)microtime(true); $found = self::findWorkerProcesses($passing_slow, $entries, $deferred); - self::$db_duration += (microtime(true) - $stamp); Lock::release('worker_process'); if ($found) { + $stamp = (float)microtime(true); $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); + self::$db_duration += (microtime(true) - $stamp); return DBA::toArray($r); } return false; @@ -943,7 +1035,10 @@ class Worker { $mypid = getmypid(); + $stamp = (float)microtime(true); DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } /** @@ -1022,9 +1117,12 @@ class Worker $timeout = Config::get("system", "frontend_worker_timeout", 10); /// @todo We should clean up the corresponding workerqueue entries as well + $stamp = (float)microtime(true); $condition = ["`created` < ? AND `command` = 'worker.php'", DateTimeFormat::utc("now - ".$timeout." minutes")]; DBA::delete('process', $condition); + self::$db_duration = (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } /** @@ -1200,8 +1298,11 @@ class Worker Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue['priority'] . '/' . $priority, Logger::DEBUG); + $stamp = (float)microtime(true); $fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority]; DBA::update('workerqueue', $fields, ['id' => $id]); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } /** @@ -1241,7 +1342,10 @@ class Worker */ public static function IPCSetJobState($jobs) { + $stamp = (float)microtime(true); DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); + self::$db_duration += (microtime(true) - $stamp); + self::$db_duration_write += (microtime(true) - $stamp); } /** @@ -1253,7 +1357,9 @@ class Worker */ public static function IPCJobsExists() { + $stamp = (float)microtime(true); $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); + self::$db_duration += (microtime(true) - $stamp); // When we don't have a row, no job is running if (!DBA::isResult($row)) { diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index b9a8c0f2e..4e49dede8 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -285,7 +285,7 @@ class Cron Logger::log("Polling " . $contact["network"] . " " . $contact["id"] . " " . $contact['priority'] . " " . $contact["nick"] . " " . $contact["name"]); - Worker::add(['priority' => $priority, 'dont_fork' => true], 'OnePoll', (int)$contact['id']); + Worker::add(['priority' => $priority, 'dont_fork' => true, 'force_priority' => true], 'OnePoll', (int)$contact['id']); } } }