From 18d6eba8d00c2338036a780db2bf6df139077184 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 29 Jun 2017 05:40:02 +0000 Subject: [PATCH 1/7] Some more performance stuff --- include/poller.php | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/include/poller.php b/include/poller.php index d9394b080..017c4bc5d 100644 --- a/include/poller.php +++ b/include/poller.php @@ -85,12 +85,13 @@ function poller_run($argv, $argc){ poller_run_cron(); } - $refetched = false; - $starttime = time(); // We fetch the next queue entry that is about to be executed while ($r = poller_worker_process()) { + + $refetched = false; + foreach ($r AS $entry) { // Assure that the priority is an integer value $entry['priority'] = (int)$entry['priority']; @@ -100,6 +101,13 @@ function poller_run($argv, $argc){ logger('Process execution failed, quitting.', LOGGER_DEBUG); return; } + + // If possible we will fetch new jobs for this worker + if (!$refetched && Lock::set('poller_worker_process', 0)) { + logger('Blubb: a'); + $refetched = find_worker_processes(); + Lock::remove('poller_worker_process'); + } } // To avoid the quitting of multiple pollers only one poller at a time will execute the check @@ -123,13 +131,6 @@ function poller_run($argv, $argc){ logger('Process lifetime reached, quitting.', LOGGER_DEBUG); return; } - - // If possible we will fetch new jobs for this worker - if (!$refetched && Lock::set('poller_worker_process', 0)) { - $refetched = find_worker_processes(); - Lock::remove('poller_worker_process'); - } - } logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } @@ -573,8 +574,7 @@ function poller_too_much_workers() { if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) { logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG); $args = array("include/poller.php", "no_cron"); - $a = get_app(); - $a->proc_run($args); + get_app()->proc_run($args); } } @@ -648,7 +648,12 @@ function poller_passing_slow(&$highest_priority) { * * @return boolean Have we found something? */ -function find_worker_processes() { +function find_worker_processes($mypid = 0) { + + if ($mypid == 0) { + $mypid = getmypid(); + } + // Check if we should pass some low priority process $highest_priority = 0; $found = false; @@ -662,7 +667,7 @@ function find_worker_processes() { $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` < ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + datetime_convert(), $mypid, NULL_DATE, $highest_priority); if ($result) { $found = (dba::affected_rows() > 0); } @@ -672,7 +677,7 @@ function find_worker_processes() { $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND `priority` > ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), getmypid(), NULL_DATE, $highest_priority); + datetime_convert(), $mypid, NULL_DATE, $highest_priority); if ($result) { $found = (dba::affected_rows() > 0); } @@ -682,7 +687,7 @@ function find_worker_processes() { // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), getmypid(), NULL_DATE); + datetime_convert(), $mypid, NULL_DATE); if ($result) { $found = (dba::affected_rows() > 0); } @@ -778,8 +783,7 @@ function call_worker_if_idle() { logger('Call poller', LOGGER_DEBUG); $args = array("include/poller.php", "no_cron"); - $a = get_app(); - $a->proc_run($args); + get_app()->proc_run($args); return; } @@ -834,8 +838,5 @@ if (array_search(__file__,get_included_files())===0){ get_app()->end_process(); - Lock::remove('poller_worker'); - Lock::remove('poller_worker_process'); - killme(); } From 99b86c9fd9a14e90c0048b623db952d21624d239 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 29 Jun 2017 21:19:31 +0000 Subject: [PATCH 2/7] Just found the handbrake ... --- include/dba.php | 10 ++++++-- include/poller.php | 64 ++++++++++++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/include/dba.php b/include/dba.php index 1c63fa505..e39e6f136 100644 --- a/include/dba.php +++ b/include/dba.php @@ -627,6 +627,12 @@ class dba { self::$dbo->errorno = mysql_errno(self::$dbo->db); } else { self::$dbo->affected_rows = mysql_affected_rows($retval); + + // Due to missing mysql_* support this here wasn't tested at all + // See here: http://php.net/manual/en/function.mysql-num-rows.php + if (self::$dbo->affected_rows <= 0) { + self::$dbo->affected_rows = mysql_num_rows($retval); + } } break; } @@ -1038,7 +1044,7 @@ class dba { $sql = "DELETE FROM `".$command['table']."` WHERE `". implode("` = ? AND `", array_keys($command['param']))."` = ?"; - logger(dba::replace_parameters($sql, $command['param']), LOGGER_DATA); + logger(self::replace_parameters($sql, $command['param']), LOGGER_DATA); if (!self::e($sql, $command['param'])) { if ($do_transaction) { @@ -1068,7 +1074,7 @@ class dba { $sql = "DELETE FROM `".$table."` WHERE `".$field."` IN (". substr(str_repeat("?, ", count($field_values)), 0, -2).");"; - logger(dba::replace_parameters($sql, $field_values), LOGGER_DATA); + logger(self::replace_parameters($sql, $field_values), LOGGER_DATA); if (!self::e($sql, $field_values)) { if ($do_transaction) { diff --git a/include/poller.php b/include/poller.php index 017c4bc5d..4ad955388 100644 --- a/include/poller.php +++ b/include/poller.php @@ -18,7 +18,7 @@ if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) { require_once("boot.php"); function poller_run($argv, $argc){ - global $a, $db, $poller_up_start; + global $a, $db, $poller_up_start, $poller_db_duration; $poller_up_start = microtime(true); @@ -104,8 +104,9 @@ function poller_run($argv, $argc){ // If possible we will fetch new jobs for this worker if (!$refetched && Lock::set('poller_worker_process', 0)) { - logger('Blubb: a'); + $stamp = (float)microtime(true); $refetched = find_worker_processes(); + $poller_db_duration += (microtime(true) - $stamp); Lock::remove('poller_worker_process'); } } @@ -545,15 +546,16 @@ function poller_too_much_workers() { } dba::close($entries); - $jobs_per_minute = 0; - - $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE"); - if ($job = dba::fetch($jobs)) { - $jobs_per_minute = number_format($job['jobs'] / 10, 0); + $intervals = array(1, 10, 60); + $jobs_per_minute = array(); + foreach ($intervals AS $interval) { + $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ".intval($interval)." MINUTE"); + if ($job = dba::fetch($jobs)) { + $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); + } + dba::close($jobs); } - dba::close($jobs); - - $processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')'; + $processlist = ' - jpm: '.implode('/', $jobs_per_minute).' ('.implode(', ', $listitem).')'; } $entries = poller_total_entries(); @@ -664,34 +666,52 @@ function find_worker_processes($mypid = 0) { if (poller_passing_slow($highest_priority)) { // Are there waiting processes with a higher priority than the currently highest? - $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND `priority` < ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), $mypid, NULL_DATE, $highest_priority); - if ($result) { - $found = (dba::affected_rows() > 0); + NULL_DATE, $highest_priority); + + while ($id = dba::fetch($result)) { + $ids[] = $id["id"]; } + dba::close($result); + + $found = (count($ids) > 0); if (!$found) { // Give slower processes some processing time - $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? + $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND `priority` > ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), $mypid, NULL_DATE, $highest_priority); - if ($result) { - $found = (dba::affected_rows() > 0); + NULL_DATE, $highest_priority); + + while ($id = dba::fetch($result)) { + $ids[] = $id["id"]; } + dba::close($result); + + $found = (count($ids) > 0); } } // If there is no result (or we shouldn't pass lower processes) we check without priority limit if (!$found) { - $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), - datetime_convert(), $mypid, NULL_DATE); - if ($result) { - $found = (dba::affected_rows() > 0); + $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE); + + while ($id = dba::fetch($result)) { + $ids[] = $id["id"]; } + dba::close($result); + + $found = (count($ids) > 0); } + + if ($found) { + $sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;"; + array_unshift($ids, datetime_convert(), $mypid); + dba::e($sql, $ids); + } + return $found; } From 7726353601547fd829f3f86dc8598cfe0378f367 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 30 Jun 2017 23:08:08 +0000 Subject: [PATCH 3/7] Prevent soon killings --- include/poller.php | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/include/poller.php b/include/poller.php index 4ad955388..bcdad586e 100644 --- a/include/poller.php +++ b/include/poller.php @@ -113,6 +113,7 @@ function poller_run($argv, $argc){ // To avoid the quitting of multiple pollers only one poller at a time will execute the check if (Lock::set('poller_worker', 0)) { + $stamp = (float)microtime(true); // Count active workers and compare them with a maximum value that depends on the load if (poller_too_much_workers()) { logger('Active worker limit reached, quitting.', LOGGER_DEBUG); @@ -125,6 +126,7 @@ function poller_run($argv, $argc){ return; } Lock::remove('poller_worker'); + $poller_db_duration += (microtime(true) - $stamp); } // Quit the poller once every 5 minutes @@ -185,7 +187,7 @@ function poller_process_with_priority_active($priority) { * @return boolean "true" if further processing should be stopped */ function poller_execute($queue) { - global $poller_db_duration; + global $poller_db_duration, $poller_last_update; $a = get_app(); @@ -225,6 +227,19 @@ function poller_execute($queue) { $funcname = str_replace(".php", "", basename($argv[0]))."_run"; if (function_exists($funcname)) { + + // We constantly update the "executed" date every minute to avoid being killed to soon + if (!isset($poller_last_update)) { + $poller_last_update = strtotime($queue["executed"]); + } + + $age = (time() - $poller_last_update) / 60; + $poller_last_update = time(); + + if ($age > 1) { + dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false)); + } + poller_exec_function($queue, $funcname, $argv); $stamp = (float)microtime(true); @@ -650,18 +665,16 @@ function poller_passing_slow(&$highest_priority) { * * @return boolean Have we found something? */ -function find_worker_processes($mypid = 0) { +function find_worker_processes() { - if ($mypid == 0) { - $mypid = getmypid(); - } + $mypid = getmypid(); // Check if we should pass some low priority process $highest_priority = 0; $found = false; // The higher the number of parallel workers, the more we prefetch to prevent concurring access - $limit = Config::get("system", "worker_queues", 4) * 2; + $limit = Config::get("system", "worker_queues", 4); $limit = Config::get('system', 'worker_fetch_limit', $limit); if (poller_passing_slow($highest_priority)) { From 0cd9db9cb7f4c96f597e37590a536eaae123238d Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 1 Jul 2017 18:55:12 +0000 Subject: [PATCH 4/7] Degrade priority step by step --- include/poller.php | 47 ++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/include/poller.php b/include/poller.php index bcdad586e..934641c13 100644 --- a/include/poller.php +++ b/include/poller.php @@ -237,7 +237,9 @@ function poller_execute($queue) { $poller_last_update = time(); if ($age > 1) { + $stamp = (float)microtime(true); dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false)); + $poller_db_duration += (microtime(true) - $stamp); } poller_exec_function($queue, $funcname, $argv); @@ -468,46 +470,47 @@ function poller_max_connections_reached() { * */ function poller_kill_stale_workers() { - $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE)); + $entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0", NULL_DATE); - if (!dbm::is_result($r)) { - // No processing here needed - return; - } - - foreach ($r AS $pid) { - if (!posix_kill($pid["pid"], 0)) { + while ($entry = dba::fetch($entries)) { + if (!posix_kill($entry["pid"], 0)) { dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), - array('pid' => $pid["pid"], 'done' => false)); + array('pid' => $entry["pid"], 'done' => false)); } else { // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) { - $pid["priority"] = PRIORITY_MEDIUM; + if (!in_array($entry["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) { + $entry["priority"] = PRIORITY_MEDIUM; } // Define the maximum durations - $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360); - $max_duration = $max_duration_defaults[$pid["priority"]]; + $max_duration_defaults = array(PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720); + $max_duration = $max_duration_defaults[$entry["priority"]]; - $argv = json_decode($pid["parameter"]); + $argv = json_decode($entry["parameter"]); $argv[0] = basename($argv[0]); // How long is the process already running? - $duration = (time() - strtotime($pid["executed"])) / 60; + $duration = (time() - strtotime($entry["executed"])) / 60; if ($duration > $max_duration) { - logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now."); - posix_kill($pid["pid"], SIGTERM); + logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now."); + posix_kill($entry["pid"], SIGTERM); // We killed the stale process. // To avoid a blocking situation we reschedule the process at the beginning of the queue. - // Additionally we are lowering the priority. + // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) + if ($entry["priority"] == PRIORITY_HIGH) { + $new_priority = PRIORITY_MEDIUM; + } elseif ($entry["priority"] == PRIORITY_MEDIUM) { + $new_priority = PRIORITY_LOW; + } elseif ($entry["priority"] != PRIORITY_CRITICAL) { + $new_priority = PRIORITY_NEGLIGIBLE; + } dba::update('workerqueue', - array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0), - array('pid' => $pid["pid"], 'done' => false)); + array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => $new_priority, 'pid' => 0), + array('pid' => $entry["pid"], 'done' => false)); } else { - logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); + logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); } } } From bae4569a0b5c59c26e25f25d71734fb27e31e64b Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 2 Jul 2017 19:43:36 +0000 Subject: [PATCH 5/7] Avoid the killing of the wrong process --- include/poller.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/poller.php b/include/poller.php index 934641c13..50d883644 100644 --- a/include/poller.php +++ b/include/poller.php @@ -228,7 +228,7 @@ function poller_execute($queue) { if (function_exists($funcname)) { - // We constantly update the "executed" date every minute to avoid being killed to soon + // We constantly update the "executed" date every minute to avoid being killed too soon if (!isset($poller_last_update)) { $poller_last_update = strtotime($queue["executed"]); } @@ -470,7 +470,7 @@ function poller_max_connections_reached() { * */ function poller_kill_stale_workers() { - $entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0", NULL_DATE); + $entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0 ORDER BY `priority`, `created`", NULL_DATE); while ($entry = dba::fetch($entries)) { if (!posix_kill($entry["pid"], 0)) { From 0ab791581014cde0b1c887a663de82f8f7c3410a Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 4 Jul 2017 05:01:06 +0000 Subject: [PATCH 6/7] Only kill the first stale process --- include/poller.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/poller.php b/include/poller.php index 50d883644..4c0f66596 100644 --- a/include/poller.php +++ b/include/poller.php @@ -470,12 +470,12 @@ function poller_max_connections_reached() { * */ function poller_kill_stale_workers() { - $entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0 ORDER BY `priority`, `created`", NULL_DATE); + $entries = dba::p("SELECT `id`, `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0 ORDER BY `priority`, `created`", NULL_DATE); while ($entry = dba::fetch($entries)) { if (!posix_kill($entry["pid"], 0)) { dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), - array('pid' => $entry["pid"], 'done' => false)); + array('id' => $entry["id"])); } else { // Kill long running processes // Check if the priority is in a valid range @@ -508,7 +508,7 @@ function poller_kill_stale_workers() { } dba::update('workerqueue', array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => $new_priority, 'pid' => 0), - array('pid' => $entry["pid"], 'done' => false)); + array('id' => $entry["id"])); } else { logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); } From c8b1b5180995abd0d913b8b087d1a7fad001e6ab Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 4 Jul 2017 05:31:42 +0000 Subject: [PATCH 7/7] Improved logging --- include/items.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/items.php b/include/items.php index 31a709fcb..5eb254b1e 100644 --- a/include/items.php +++ b/include/items.php @@ -2051,7 +2051,7 @@ function item_expire($uid, $days, $network = "", $force = false) { $expire_photos = get_pconfig($uid, 'expire', 'photos'); $expire_photos = (($expire_photos === false) ? 0 : intval($expire_photos)); // default if not set: 0 - logger('expire: # items=' . count($r). "; expire items: $expire_items, expire notes: $expire_notes, expire starred: $expire_starred, expire photos: $expire_photos"); + logger('User '.$uid.': expire: # items=' . count($r). "; expire items: $expire_items, expire notes: $expire_notes, expire starred: $expire_starred, expire photos: $expire_photos"); foreach ($r as $item) {