2010-07-19 03:49:54 +00:00
< ? php
2017-04-30 04:01:26 +00:00
2017-04-30 04:07:00 +00:00
use Friendica\App ;
2017-04-30 04:01:26 +00:00
use Friendica\Core\Config ;
2017-06-04 15:59:20 +00:00
use Friendica\Util\Lock ;
2017-04-30 04:01:26 +00:00
2017-06-08 02:00:59 +00:00
if ( ! file_exists ( " boot.php " ) && ( sizeof ( $_SERVER [ " argv " ]) != 0 )) {
2015-02-19 09:45:46 +00:00
$directory = dirname ( $_SERVER [ " argv " ][ 0 ]);
2015-02-19 09:26:49 +00:00
2017-06-04 20:03:37 +00:00
if ( substr ( $directory , 0 , 1 ) != " / " ) {
2015-02-19 09:45:46 +00:00
$directory = $_SERVER [ " PWD " ] . " / " . $directory ;
2017-06-04 20:03:37 +00:00
}
2015-02-19 09:45:46 +00:00
$directory = realpath ( $directory . " /.. " );
2015-02-19 09:26:49 +00:00
2015-02-19 09:45:46 +00:00
chdir ( $directory );
}
2011-04-16 06:40:43 +00:00
2011-01-28 13:04:18 +00:00
require_once ( " boot.php " );
2016-11-27 00:55:05 +00:00
function poller_run ( $argv , $argc ){
2017-06-17 13:58:18 +00:00
global $a , $db , $poller_up_start ;
$poller_up_start = microtime ( true );
2011-01-28 13:04:18 +00:00
2017-06-04 15:59:20 +00:00
$a = new App ( dirname ( __DIR__ ));
2013-01-20 13:08:28 +00:00
2017-06-04 15:59:20 +00:00
@ include ( " .htconfig.php " );
require_once ( " include/dba.php " );
$db = new dba ( $db_host , $db_user , $db_pass , $db_data );
unset ( $db_host , $db_user , $db_pass , $db_data );
2013-01-20 13:08:28 +00:00
2017-02-25 13:59:24 +00:00
Config :: load ();
2016-10-02 13:52:52 +00:00
// Quit when in maintenance
2017-01-21 06:06:29 +00:00
if ( Config :: get ( 'system' , 'maintenance' , true )) {
2016-10-02 13:52:52 +00:00
return ;
2017-01-21 06:06:29 +00:00
}
2016-10-02 13:52:52 +00:00
2017-02-26 23:16:49 +00:00
$a -> set_baseurl ( Config :: get ( 'system' , 'url' ));
load_hooks ();
2017-06-04 20:03:37 +00:00
// At first check the maximum load. We shouldn't continue with a high load
if ( $a -> maxload_reached ()) {
logger ( 'Pre check: maximum load reached, quitting.' , LOGGER_DEBUG );
return ;
2017-06-04 15:59:20 +00:00
}
2017-06-04 20:03:37 +00:00
// We now start the process. This is done after the load check since this could increase the load.
2016-09-09 20:33:54 +00:00
$a -> start_process ();
2017-06-12 09:44:46 +00:00
// Kill stale processes every 5 minutes
$last_cleanup = Config :: get ( 'system' , 'poller_last_cleaned' , 0 );
if ( time () > ( $last_cleanup + 300 )) {
Config :: set ( 'system' , 'poller_last_cleaned' , time ());
2017-06-04 20:03:37 +00:00
poller_kill_stale_workers ();
}
2017-06-12 21:39:20 +00:00
// Count active workers and compare them with a maximum value that depends on the load
if ( poller_too_much_workers ()) {
logger ( 'Pre check: Active worker limit reached, quitting.' , LOGGER_DEBUG );
return ;
}
2017-06-04 20:03:37 +00:00
// Do we have too few memory?
2017-04-29 17:36:46 +00:00
if ( $a -> min_memory_reached ()) {
2017-06-04 18:59:50 +00:00
logger ( 'Pre check: Memory limit reached, quitting.' , LOGGER_DEBUG );
2017-04-29 17:36:46 +00:00
return ;
}
2017-06-04 20:03:37 +00:00
// Possibly there are too much database connections
2016-11-30 19:24:58 +00:00
if ( poller_max_connections_reached ()) {
2017-06-04 18:59:50 +00:00
logger ( 'Pre check: maximum connections reached, quitting.' , LOGGER_DEBUG );
2016-07-02 07:31:28 +00:00
return ;
2016-11-30 19:24:58 +00:00
}
2016-06-01 19:54:02 +00:00
2017-06-04 20:03:37 +00:00
// Possibly there are too much database processes that block the system
2017-06-04 15:59:20 +00:00
if ( $a -> max_processes_reached ()) {
2017-06-04 18:59:50 +00:00
logger ( 'Pre check: maximum processes reached, quitting.' , LOGGER_DEBUG );
2016-03-08 19:28:09 +00:00
return ;
2016-11-30 19:24:58 +00:00
}
2015-09-23 06:56:48 +00:00
2017-06-04 20:03:37 +00:00
// Now we start additional cron processes if we should do so
2017-06-08 02:00:59 +00:00
if (( $argc <= 1 ) || ( $argv [ 1 ] != " no_cron " )) {
2017-06-04 20:03:37 +00:00
poller_run_cron ();
2016-01-24 01:53:21 +00:00
}
2015-12-06 15:40:31 +00:00
2017-06-28 04:53:11 +00:00
$refetched = false ;
2015-09-25 15:38:56 +00:00
$starttime = time ();
2017-06-04 20:03:37 +00:00
// We fetch the next queue entry that is about to be executed
2016-10-10 21:01:36 +00:00
while ( $r = poller_worker_process ()) {
2017-06-12 21:39:20 +00:00
foreach ( $r AS $entry ) {
// Assure that the priority is an integer value
$entry [ 'priority' ] = ( int ) $entry [ 'priority' ];
2015-09-23 06:56:48 +00:00
2017-06-12 21:39:20 +00:00
// The work will be done
if ( ! poller_execute ( $entry )) {
logger ( 'Process execution failed, quitting.' , LOGGER_DEBUG );
2017-06-05 22:41:33 +00:00
return ;
}
2017-06-13 13:51:25 +00:00
}
2017-06-04 15:59:20 +00:00
2017-06-13 13:51:25 +00:00
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
if ( Lock :: set ( 'poller_worker' , 0 )) {
// Count active workers and compare them with a maximum value that depends on the load
if ( poller_too_much_workers ()) {
logger ( 'Active worker limit reached, quitting.' , LOGGER_DEBUG );
return ;
}
2017-06-04 20:03:37 +00:00
2017-06-13 13:51:25 +00:00
// Check free memory
if ( $a -> min_memory_reached ()) {
logger ( 'Memory limit reached, quitting.' , LOGGER_DEBUG );
return ;
2017-06-12 21:39:20 +00:00
}
2017-06-13 13:51:25 +00:00
Lock :: remove ( 'poller_worker' );
2016-11-27 00:55:05 +00:00
}
2016-06-01 19:54:02 +00:00
2017-06-13 13:51:25 +00:00
// Quit the poller once every 5 minutes
if ( time () > ( $starttime + 300 )) {
2017-06-04 19:05:15 +00:00
logger ( 'Process lifetime reached, quitting.' , LOGGER_DEBUG );
2016-02-11 20:39:34 +00:00
return ;
2017-06-03 07:25:01 +00:00
}
2017-06-28 04:53:11 +00:00
// If possible we will fetch new jobs for this worker
if ( ! $refetched && Lock :: set ( 'poller_worker_process' , 0 )) {
$refetched = find_worker_processes ();
Lock :: remove ( 'poller_worker_process' );
}
2016-11-27 00:55:05 +00:00
}
2017-06-03 07:25:01 +00:00
logger ( " Couldn't select a workerqueue entry, quitting. " , LOGGER_DEBUG );
2016-11-27 00:55:05 +00:00
}
2017-06-08 20:43:30 +00:00
/**
* @ brief Returns the number of non executed entries in the worker queue
*
* @ return integer Number of non executed entries in the worker queue
*/
function poller_total_entries () {
2017-06-17 13:58:18 +00:00
$s = q ( " SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done` " , dbesc ( NULL_DATE ));
2017-06-13 20:51:24 +00:00
if ( dbm :: is_result ( $s )) {
return $s [ 0 ][ " total " ];
} else {
return 0 ;
}
2017-06-08 20:43:30 +00:00
}
/**
* @ brief Returns the highest priority in the worker queue that isn ' t executed
*
* @ return integer Number of active poller processes
*/
function poller_highest_priority () {
2017-06-17 13:58:18 +00:00
$s = q ( " SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done` ORDER BY `priority` LIMIT 1 " , dbesc ( NULL_DATE ));
2017-06-13 20:51:24 +00:00
if ( dbm :: is_result ( $s )) {
return $s [ 0 ][ " priority " ];
} else {
return 0 ;
}
2017-06-08 20:43:30 +00:00
}
/**
* @ brief Returns if a process with the given priority is running
*
* @ param integer $priority The priority that should be checked
*
* @ return integer Is there a process running with that priority ?
*/
function poller_process_with_priority_active ( $priority ) {
2017-06-17 13:58:18 +00:00
$s = q ( " SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' AND NOT `done` LIMIT 1 " ,
2017-06-08 20:43:30 +00:00
intval ( $priority ), dbesc ( NULL_DATE ));
return dbm :: is_result ( $s );
}
2016-11-27 00:55:05 +00:00
/**
* @ brief Execute a worker entry
*
* @ param array $queue Workerqueue entry
*
* @ return boolean " true " if further processing should be stopped
*/
function poller_execute ( $queue ) {
2017-06-17 13:58:18 +00:00
global $poller_db_duration ;
2015-12-15 23:14:53 +00:00
2016-11-27 00:55:05 +00:00
$a = get_app ();
2011-04-16 06:40:43 +00:00
2016-11-27 00:55:05 +00:00
$mypid = getmypid ();
2016-10-17 18:38:51 +00:00
2016-11-27 00:55:05 +00:00
// Quit when in maintenance
2017-01-21 06:06:29 +00:00
if ( Config :: get ( 'system' , 'maintenance' , true )) {
2017-05-30 13:20:29 +00:00
logger ( " Maintenance mode - quit process " . $mypid , LOGGER_DEBUG );
2016-11-27 00:55:05 +00:00
return false ;
}
// Constantly check the number of parallel database processes
if ( $a -> max_processes_reached ()) {
2017-05-30 13:20:29 +00:00
logger ( " Max processes reached for process " . $mypid , LOGGER_DEBUG );
2016-11-27 00:55:05 +00:00
return false ;
}
// Constantly check the number of available database connections to let the frontend be accessible at any time
if ( poller_max_connections_reached ()) {
2017-05-30 13:20:29 +00:00
logger ( " Max connection reached for process " . $mypid , LOGGER_DEBUG );
2016-11-27 00:55:05 +00:00
return false ;
}
$argv = json_decode ( $queue [ " parameter " ]);
2016-07-24 18:25:11 +00:00
2016-11-27 00:55:05 +00:00
// Check for existance and validity of the include file
$include = $argv [ 0 ];
2015-09-13 16:47:10 +00:00
2016-11-27 00:55:05 +00:00
if ( ! validate_include ( $include )) {
logger ( " Include file " . $argv [ 0 ] . " is not valid! " );
2017-06-13 05:52:59 +00:00
dba :: delete ( 'workerqueue' , array ( 'id' => $queue [ " id " ]));
2016-11-27 00:55:05 +00:00
return true ;
}
2010-12-21 22:51:26 +00:00
2016-11-27 00:55:05 +00:00
require_once ( $include );
2016-07-24 18:25:11 +00:00
2016-11-27 00:55:05 +00:00
$funcname = str_replace ( " .php " , " " , basename ( $argv [ 0 ])) . " _run " ;
2016-10-07 06:05:43 +00:00
2016-11-27 00:55:05 +00:00
if ( function_exists ( $funcname )) {
2017-01-21 19:50:56 +00:00
poller_exec_function ( $queue , $funcname , $argv );
2017-06-17 13:58:18 +00:00
$stamp = ( float ) microtime ( true );
dba :: update ( 'workerqueue' , array ( 'done' => true ), array ( 'id' => $queue [ " id " ]));
$poller_db_duration = ( microtime ( true ) - $stamp );
2017-01-21 19:50:56 +00:00
} else {
logger ( " Function " . $funcname . " does not exist " );
2017-06-17 13:58:18 +00:00
dba :: delete ( 'workerqueue' , array ( 'id' => $queue [ " id " ]));
2017-01-21 19:50:56 +00:00
}
2017-01-20 21:58:53 +00:00
2017-01-21 19:50:56 +00:00
return true ;
}
2015-09-11 19:35:58 +00:00
2017-01-21 19:50:56 +00:00
/**
* @ brief Execute a function from the queue
*
* @ param array $queue Workerqueue entry
* @ param string $funcname name of the function
2017-01-27 15:13:37 +00:00
* @ param array $argv Array of values to be passed to the function
2017-01-21 19:50:56 +00:00
*/
function poller_exec_function ( $queue , $funcname , $argv ) {
2017-06-28 04:53:11 +00:00
global $poller_up_start , $poller_db_duration , $poller_lock_duration ;
2016-10-07 06:05:43 +00:00
2017-01-21 19:50:56 +00:00
$a = get_app ();
2016-07-13 17:43:16 +00:00
2017-01-21 19:50:56 +00:00
$mypid = getmypid ();
2017-01-19 17:06:23 +00:00
2017-01-21 19:50:56 +00:00
$argc = count ( $argv );
2015-09-29 04:15:26 +00:00
2017-01-21 19:50:56 +00:00
logger ( " Process " . $mypid . " - Prio " . $queue [ " priority " ] . " - ID " . $queue [ " id " ] . " : " . $funcname . " " . $queue [ " parameter " ]);
$stamp = ( float ) microtime ( true );
2017-01-21 20:15:49 +00:00
// We use the callstack here to analyze the performance of executed worker entries.
// For this reason the variables have to be initialized.
2017-01-21 19:50:56 +00:00
if ( Config :: get ( " system " , " profiler " )) {
$a -> performance [ " start " ] = microtime ( true );
$a -> performance [ " database " ] = 0 ;
$a -> performance [ " database_write " ] = 0 ;
$a -> performance [ " network " ] = 0 ;
$a -> performance [ " file " ] = 0 ;
$a -> performance [ " rendering " ] = 0 ;
$a -> performance [ " parser " ] = 0 ;
$a -> performance [ " marktime " ] = 0 ;
$a -> performance [ " markstart " ] = microtime ( true );
$a -> callstack = array ();
}
2017-01-20 21:58:53 +00:00
2017-01-21 19:50:56 +00:00
// For better logging create a new process id for every worker call
// But preserve the old one for the worker
$old_process_id = $a -> process_id ;
$a -> process_id = uniqid ( " wrk " , true );
2017-06-11 19:51:18 +00:00
$a -> queue = $queue ;
2017-01-21 19:50:56 +00:00
2017-06-17 13:58:18 +00:00
$up_duration = number_format ( microtime ( true ) - $poller_up_start , 3 );
2017-01-21 19:50:56 +00:00
$funcname ( $argv , $argc );
$a -> process_id = $old_process_id ;
2017-06-11 19:51:18 +00:00
unset ( $a -> queue );
2017-01-21 19:50:56 +00:00
$duration = number_format ( microtime ( true ) - $stamp , 3 );
2017-06-17 13:58:18 +00:00
$poller_up_start = microtime ( true );
2017-06-17 19:13:40 +00:00
/* With these values we can analyze how effective the worker is .
2017-06-17 13:58:18 +00:00
* The database and rest time should be low since this is the unproductive time .
* The execution time is the productive time .
* By changing parameters like the maximum number of workers we can check the effectivness .
*/
2017-06-28 04:53:11 +00:00
logger ( 'DB: ' . number_format ( $poller_db_duration , 2 ) .
' - Lock: ' . number_format ( $poller_lock_duration , 2 ) .
' - Rest: ' . number_format ( $up_duration - $poller_db_duration - $poller_lock_duration , 2 ) .
' - Execution: ' . number_format ( $duration , 2 ), LOGGER_DEBUG );
$poller_lock_duration = 0 ;
2017-06-17 13:58:18 +00:00
2017-01-31 19:39:09 +00:00
if ( $duration > 3600 ) {
logger ( " Prio " . $queue [ " priority " ] . " : " . $queue [ " parameter " ] . " - longer than 1 hour ( " . round ( $duration / 60 , 3 ) . " ) " , LOGGER_DEBUG );
} elseif ( $duration > 600 ) {
logger ( " Prio " . $queue [ " priority " ] . " : " . $queue [ " parameter " ] . " - longer than 10 minutes ( " . round ( $duration / 60 , 3 ) . " ) " , LOGGER_DEBUG );
} elseif ( $duration > 300 ) {
logger ( " Prio " . $queue [ " priority " ] . " : " . $queue [ " parameter " ] . " - longer than 5 minutes ( " . round ( $duration / 60 , 3 ) . " ) " , LOGGER_DEBUG );
} elseif ( $duration > 120 ) {
logger ( " Prio " . $queue [ " priority " ] . " : " . $queue [ " parameter " ] . " - longer than 2 minutes ( " . round ( $duration / 60 , 3 ) . " ) " , LOGGER_DEBUG );
}
2017-01-21 19:50:56 +00:00
logger ( " Process " . $mypid . " - Prio " . $queue [ " priority " ] . " - ID " . $queue [ " id " ] . " : " . $funcname . " - done in " . $duration . " seconds. " );
2017-01-21 20:15:49 +00:00
// Write down the performance values into the log
2017-01-21 19:50:56 +00:00
if ( Config :: get ( " system " , " profiler " )) {
$duration = microtime ( true ) - $a -> performance [ " start " ];
if ( Config :: get ( " rendertime " , " callstack " )) {
if ( isset ( $a -> callstack [ " database " ])) {
2017-01-21 06:16:15 +00:00
$o = " \n Database Read: \n " ;
2017-01-21 05:05:41 +00:00
foreach ( $a -> callstack [ " database " ] AS $func => $time ) {
$time = round ( $time , 3 );
2017-06-04 20:03:37 +00:00
if ( $time > 0 ) {
2017-01-21 05:05:41 +00:00
$o .= $func . " : " . $time . " \n " ;
2017-06-04 20:03:37 +00:00
}
2017-01-21 05:05:41 +00:00
}
2017-01-21 19:50:56 +00:00
}
if ( isset ( $a -> callstack [ " database_write " ])) {
2017-01-21 05:05:41 +00:00
$o .= " \n Database Write: \n " ;
foreach ( $a -> callstack [ " database_write " ] AS $func => $time ) {
$time = round ( $time , 3 );
2017-06-04 20:03:37 +00:00
if ( $time > 0 ) {
2017-01-21 05:05:41 +00:00
$o .= $func . " : " . $time . " \n " ;
2017-06-04 20:03:37 +00:00
}
2017-01-21 05:05:41 +00:00
}
2017-01-21 19:50:56 +00:00
}
if ( isset ( $a -> callstack [ " network " ])) {
2017-01-21 05:05:41 +00:00
$o .= " \n Network: \n " ;
foreach ( $a -> callstack [ " network " ] AS $func => $time ) {
$time = round ( $time , 3 );
2017-06-04 20:03:37 +00:00
if ( $time > 0 ) {
2017-01-21 05:05:41 +00:00
$o .= $func . " : " . $time . " \n " ;
2017-06-04 20:03:37 +00:00
}
2017-01-21 05:05:41 +00:00
}
}
2017-01-21 19:50:56 +00:00
} else {
$o = '' ;
2017-01-20 21:58:53 +00:00
}
2017-01-21 19:50:56 +00:00
logger ( " ID " . $queue [ " id " ] . " : " . $funcname . " : " . sprintf ( " DB: %s/%s, Net: %s, I/O: %s, Other: %s, Total: %s " . $o ,
number_format ( $a -> performance [ " database " ] - $a -> performance [ " database_write " ], 2 ),
number_format ( $a -> performance [ " database_write " ], 2 ),
number_format ( $a -> performance [ " network " ], 2 ),
number_format ( $a -> performance [ " file " ], 2 ),
number_format ( $duration - ( $a -> performance [ " database " ] + $a -> performance [ " network " ] + $a -> performance [ " file " ]), 2 ),
number_format ( $duration , 2 )),
LOGGER_DEBUG );
2011-03-05 04:55:32 +00:00
}
2011-04-16 15:45:08 +00:00
2017-01-21 19:50:56 +00:00
$cooldown = Config :: get ( " system " , " worker_cooldown " , 0 );
if ( $cooldown > 0 ) {
logger ( " Process " . $mypid . " - Prio " . $queue [ " priority " ] . " - ID " . $queue [ " id " ] . " : " . $funcname . " - in cooldown for " . $cooldown . " seconds " );
sleep ( $cooldown );
}
2011-01-28 13:04:18 +00:00
}
2010-07-19 03:49:54 +00:00
2016-02-09 05:42:00 +00:00
/**
* @ brief Checks if the number of database connections has reached a critical limit .
*
* @ return bool Are more than 3 / 4 of the maximum connections used ?
*/
function poller_max_connections_reached () {
2016-02-09 22:28:33 +00:00
// Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
2017-01-21 06:06:29 +00:00
$max = Config :: get ( " system " , " max_connections " );
2016-02-09 22:28:33 +00:00
2016-04-23 08:11:09 +00:00
// Fetch the percentage level where the poller will get active
2016-11-30 19:24:58 +00:00
$maxlevel = Config :: get ( " system " , " max_connections_level " , 75 );
2016-04-23 08:11:09 +00:00
2016-02-09 22:28:33 +00:00
if ( $max == 0 ) {
2016-02-11 20:39:34 +00:00
// the maximum number of possible user connections can be a system variable
$r = q ( " SHOW VARIABLES WHERE `variable_name` = 'max_user_connections' " );
2017-06-04 20:03:37 +00:00
if ( dbm :: is_result ( $r )) {
2016-02-11 20:39:34 +00:00
$max = $r [ 0 ][ " Value " ];
2017-06-04 20:03:37 +00:00
}
2016-02-11 20:39:34 +00:00
// Or it can be granted. This overrides the system variable
$r = q ( " SHOW GRANTS " );
2017-06-04 20:03:37 +00:00
if ( dbm :: is_result ( $r )) {
2016-02-11 20:39:34 +00:00
foreach ( $r AS $grants ) {
$grant = array_pop ( $grants );
2017-06-04 20:03:37 +00:00
if ( stristr ( $grant , " GRANT USAGE ON " )) {
if ( preg_match ( " /WITH MAX_USER_CONNECTIONS ( \ d*)/ " , $grant , $match )) {
2016-02-11 20:39:34 +00:00
$max = $match [ 1 ];
2017-06-04 20:03:37 +00:00
}
}
2016-02-11 20:39:34 +00:00
}
2017-06-04 20:03:37 +00:00
}
2016-02-11 20:39:34 +00:00
}
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ( $max != 0 ) {
$r = q ( " SHOW PROCESSLIST " );
2017-06-04 20:03:37 +00:00
if ( ! dbm :: is_result ( $r )) {
2016-02-11 20:39:34 +00:00
return false ;
2017-06-04 20:03:37 +00:00
}
2016-02-11 20:39:34 +00:00
$used = count ( $r );
logger ( " Connection usage (user values): " . $used . " / " . $max , LOGGER_DEBUG );
2016-04-23 08:11:09 +00:00
$level = ( $used / $max ) * 100 ;
2016-02-09 05:42:00 +00:00
2016-04-23 08:11:09 +00:00
if ( $level >= $maxlevel ) {
logger ( " Maximum level ( " . $maxlevel . " %) of user connections reached: " . $used . " / " . $max );
2016-02-12 10:04:25 +00:00
return true ;
}
}
2016-02-09 05:42:00 +00:00
2016-02-12 10:04:25 +00:00
// We will now check for the system values.
// This limit could be reached although the user limits are fine.
$r = q ( " SHOW VARIABLES WHERE `variable_name` = 'max_connections' " );
2017-06-04 20:03:37 +00:00
if ( ! dbm :: is_result ( $r )) {
2016-02-12 10:04:25 +00:00
return false ;
2017-06-04 20:03:37 +00:00
}
2016-02-12 10:04:25 +00:00
$max = intval ( $r [ 0 ][ " Value " ]);
2017-06-04 20:03:37 +00:00
if ( $max == 0 ) {
2016-02-12 10:04:25 +00:00
return false ;
2017-06-04 20:03:37 +00:00
}
2016-02-12 10:04:25 +00:00
$r = q ( " SHOW STATUS WHERE `variable_name` = 'Threads_connected' " );
2017-06-04 20:03:37 +00:00
if ( ! dbm :: is_result ( $r )) {
2016-02-12 10:04:25 +00:00
return false ;
2017-06-04 20:03:37 +00:00
}
2016-02-12 10:04:25 +00:00
$used = intval ( $r [ 0 ][ " Value " ]);
2017-06-04 20:03:37 +00:00
if ( $used == 0 ) {
2016-02-12 10:04:25 +00:00
return false ;
2017-06-04 20:03:37 +00:00
}
2016-02-12 10:04:25 +00:00
logger ( " Connection usage (system values): " . $used . " / " . $max , LOGGER_DEBUG );
2016-02-09 05:42:00 +00:00
2016-04-23 08:11:09 +00:00
$level = $used / $max * 100 ;
2016-02-09 05:42:00 +00:00
2017-06-04 20:03:37 +00:00
if ( $level < $maxlevel ) {
2016-02-09 05:42:00 +00:00
return false ;
2017-06-04 20:03:37 +00:00
}
2016-04-23 08:11:09 +00:00
logger ( " Maximum level ( " . $level . " %) of system connections reached: " . $used . " / " . $max );
2016-02-09 05:42:00 +00:00
return true ;
}
2016-01-24 01:53:21 +00:00
/**
* @ brief fix the queue entry if the worker process died
*
*/
function poller_kill_stale_workers () {
2017-06-17 13:58:18 +00:00
$r = q ( " SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done` " , dbesc ( NULL_DATE ));
2016-03-04 21:38:18 +00:00
2016-07-02 12:00:42 +00:00
if ( ! dbm :: is_result ( $r )) {
2016-03-04 21:38:18 +00:00
// No processing here needed
return ;
}
2017-03-21 16:02:59 +00:00
foreach ( $r AS $pid ) {
2017-01-26 15:07:30 +00:00
if ( ! posix_kill ( $pid [ " pid " ], 0 )) {
2017-05-12 16:09:25 +00:00
dba :: update ( 'workerqueue' , array ( 'executed' => NULL_DATE , 'pid' => 0 ),
2017-06-28 04:53:11 +00:00
array ( 'pid' => $pid [ " pid " ], 'done' => false ));
2017-01-26 15:07:30 +00:00
} else {
2016-01-24 01:53:21 +00:00
// Kill long running processes
2016-08-08 17:20:40 +00:00
// Check if the priority is in a valid range
2017-06-04 20:03:37 +00:00
if ( ! in_array ( $pid [ " priority " ], array ( PRIORITY_CRITICAL , PRIORITY_HIGH , PRIORITY_MEDIUM , PRIORITY_LOW , PRIORITY_NEGLIGIBLE ))) {
2016-08-08 17:20:40 +00:00
$pid [ " priority " ] = PRIORITY_MEDIUM ;
2017-06-04 20:03:37 +00:00
}
2016-08-08 17:20:40 +00:00
2017-06-06 20:10:47 +00:00
// Define the maximum durations
2016-08-18 10:33:17 +00:00
$max_duration_defaults = array ( PRIORITY_CRITICAL => 360 , PRIORITY_HIGH => 10 , PRIORITY_MEDIUM => 60 , PRIORITY_LOW => 180 , PRIORITY_NEGLIGIBLE => 360 );
2016-08-08 17:20:40 +00:00
$max_duration = $max_duration_defaults [ $pid [ " priority " ]];
2016-08-14 19:02:29 +00:00
$argv = json_decode ( $pid [ " parameter " ]);
$argv [ 0 ] = basename ( $argv [ 0 ]);
2016-08-08 17:20:40 +00:00
// How long is the process already running?
2016-01-24 01:53:21 +00:00
$duration = ( time () - strtotime ( $pid [ " executed " ])) / 60 ;
2016-08-08 17:20:40 +00:00
if ( $duration > $max_duration ) {
2016-08-14 19:02:29 +00:00
logger ( " Worker process " . $pid [ " pid " ] . " ( " . implode ( " " , $argv ) . " ) took more than " . $max_duration . " minutes. It will be killed now. " );
2016-01-24 01:53:21 +00:00
posix_kill ( $pid [ " pid " ], SIGTERM );
2016-08-04 13:33:15 +00:00
// We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
2016-08-04 13:41:32 +00:00
// Additionally we are lowering the priority.
2017-05-12 16:09:25 +00:00
dba :: update ( 'workerqueue' ,
array ( 'executed' => NULL_DATE , 'created' => datetime_convert (), 'priority' => PRIORITY_NEGLIGIBLE , 'pid' => 0 ),
2017-06-28 04:53:11 +00:00
array ( 'pid' => $pid [ " pid " ], 'done' => false ));
2017-01-26 15:07:30 +00:00
} else {
2016-08-14 19:02:29 +00:00
logger ( " Worker process " . $pid [ " pid " ] . " ( " . implode ( " " , $argv ) . " ) now runs for " . round ( $duration ) . " of " . $max_duration . " allowed minutes. That's okay. " , LOGGER_DEBUG );
2017-01-26 15:07:30 +00:00
}
2016-01-24 01:53:21 +00:00
}
2017-03-19 08:04:04 +00:00
}
2016-01-24 01:53:21 +00:00
}
2016-11-27 01:11:22 +00:00
/**
* @ brief Checks if the number of active workers exceeds the given limits
*
* @ return bool Are there too much workers running ?
*/
2017-06-12 21:39:20 +00:00
function poller_too_much_workers () {
2016-11-30 19:24:58 +00:00
$queues = Config :: get ( " system " , " worker_queues " , 4 );
2015-09-28 05:54:28 +00:00
2016-07-23 20:57:22 +00:00
$maxqueues = $queues ;
2017-06-03 07:25:01 +00:00
$active = poller_active_workers ();
2015-09-28 05:54:28 +00:00
// Decrease the number of workers at higher load
2015-12-15 22:26:58 +00:00
$load = current_load ();
2017-05-29 19:14:44 +00:00
if ( $load ) {
2016-11-30 19:24:58 +00:00
$maxsysload = intval ( Config :: get ( " system " , " maxloadavg " , 50 ));
2015-09-28 05:54:28 +00:00
2015-09-28 17:14:07 +00:00
$maxworkers = $queues ;
// Some magical mathemathics to reduce the workers
$exponent = 3 ;
$slope = $maxworkers / pow ( $maxsysload , $exponent );
$queues = ceil ( $slope * pow ( max ( 0 , $maxsysload - $load ), $exponent ));
2015-09-28 05:54:28 +00:00
2017-06-08 20:43:30 +00:00
if ( Config :: get ( 'system' , 'worker_debug' )) {
// Create a list of queue entries grouped by their priority
$listitem = array ();
2017-05-29 19:14:44 +00:00
2017-06-08 20:43:30 +00:00
// Adding all processes with no workerqueue entry
2017-06-28 04:53:11 +00:00
$processes = dba :: p ( " SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS
( SELECT id FROM `workerqueue`
WHERE `workerqueue` . `pid` = `process` . `pid` AND NOT `done` AND `pid` != ? ) " , getmypid());
2017-05-29 19:14:44 +00:00
if ( $process = dba :: fetch ( $processes )) {
2017-06-08 20:43:30 +00:00
$listitem [ 0 ] = " 0: " . $process [ " running " ];
2017-05-29 19:14:44 +00:00
}
dba :: close ( $processes );
2017-06-08 20:43:30 +00:00
// Now adding all processes with workerqueue entries
2017-06-17 14:15:17 +00:00
$entries = dba :: p ( " SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority` " );
2017-06-08 20:43:30 +00:00
while ( $entry = dba :: fetch ( $entries )) {
2017-06-17 13:58:18 +00:00
$processes = dba :: p ( " SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` WHERE `priority` = ? " , $entry [ " priority " ]);
2017-06-08 20:43:30 +00:00
if ( $process = dba :: fetch ( $processes )) {
$listitem [ $entry [ " priority " ]] = $entry [ " priority " ] . " : " . $process [ " running " ] . " / " . $entry [ " entries " ];
}
dba :: close ( $processes );
}
dba :: close ( $entries );
2017-06-28 04:53:11 +00:00
$jobs_per_minute = 0 ;
$jobs = dba :: p ( " SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE " );
if ( $job = dba :: fetch ( $jobs )) {
$jobs_per_minute = number_format ( $job [ 'jobs' ] / 10 , 0 );
}
dba :: close ( $jobs );
$processlist = ' - jpm: ' . $jobs_per_minute . ' (' . implode ( ', ' , $listitem ) . ')' ;
2017-06-08 20:43:30 +00:00
}
2016-08-03 13:59:25 +00:00
2017-06-12 21:39:20 +00:00
$entries = poller_total_entries ();
2017-06-08 02:00:59 +00:00
if ( Config :: get ( " system " , " worker_fastlane " , false ) && ( $queues > 0 ) && ( $entries > 0 ) && ( $active >= $queues )) {
2017-06-12 21:39:20 +00:00
$top_priority = poller_highest_priority ();
$high_running = poller_process_with_priority_active ( $top_priority );
2017-06-08 02:00:59 +00:00
if ( ! $high_running && ( $top_priority > PRIORITY_UNDEFINED ) && ( $top_priority < PRIORITY_NEGLIGIBLE )) {
2016-08-03 13:59:25 +00:00
logger ( " There are jobs with priority " . $top_priority . " waiting but none is executed. Open a fastlane. " , LOGGER_DEBUG );
2016-08-03 08:19:46 +00:00
$queues = $active + 1 ;
2016-08-03 08:03:05 +00:00
}
}
2017-06-06 12:07:27 +00:00
logger ( " Load: " . $load . " / " . $maxsysload . " - processes: " . $active . " / " . $entries . $processlist . " - maximum: " . $queues . " / " . $maxqueues , LOGGER_DEBUG );
2015-09-27 11:56:20 +00:00
2016-08-01 05:48:43 +00:00
// Are there fewer workers running as possible? Then fork a new one.
2017-06-08 02:00:59 +00:00
if ( ! Config :: get ( " system " , " worker_dont_fork " ) && ( $queues > ( $active + 1 )) && ( $entries > 1 )) {
2016-08-01 05:48:43 +00:00
logger ( " Active workers: " . $active . " / " . $queues . " Fork a new worker. " , LOGGER_DEBUG );
2017-02-27 06:27:22 +00:00
$args = array ( " include/poller.php " , " no_cron " );
2016-08-01 05:48:43 +00:00
$a = get_app ();
$a -> proc_run ( $args );
}
2015-09-27 11:56:20 +00:00
}
2017-06-04 20:03:37 +00:00
return $active >= $queues ;
2015-09-27 11:56:20 +00:00
}
2016-11-27 01:11:22 +00:00
/**
* @ brief Returns the number of active poller processes
*
* @ return integer Number of active poller processes
*/
2015-09-27 11:56:20 +00:00
function poller_active_workers () {
2016-09-09 20:33:54 +00:00
$workers = q ( " SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php' " );
2015-09-27 11:56:20 +00:00
2017-06-04 20:03:37 +00:00
return $workers [ 0 ][ " processes " ];
2015-09-27 11:56:20 +00:00
}
2016-10-10 21:01:36 +00:00
/**
* @ brief Check if we should pass some slow processes
*
* When the active processes of the highest priority are using more than 2 / 3
* of all processes , we let pass slower processes .
*
* @ param string $highest_priority Returns the currently highest priority
* @ return bool We let pass a slower process than $highest_priority
*/
function poller_passing_slow ( & $highest_priority ) {
$highest_priority = 0 ;
$r = q ( " SELECT `priority`
FROM `process`
2017-06-17 13:58:18 +00:00
INNER JOIN `workerqueue` ON `workerqueue` . `pid` = `process` . `pid` AND NOT `done` " );
2016-10-10 21:01:36 +00:00
// No active processes at all? Fine
2017-06-04 20:03:37 +00:00
if ( ! dbm :: is_result ( $r )) {
return false ;
}
2016-10-10 21:01:36 +00:00
$priorities = array ();
2017-06-04 20:03:37 +00:00
foreach ( $r AS $line ) {
2016-10-10 21:01:36 +00:00
$priorities [] = $line [ " priority " ];
2017-06-04 20:03:37 +00:00
}
2016-10-10 21:01:36 +00:00
// Should not happen
2017-06-04 20:03:37 +00:00
if ( count ( $priorities ) == 0 ) {
return false ;
}
2016-10-10 21:01:36 +00:00
$highest_priority = min ( $priorities );
// The highest process is already the slowest one?
// Then we quit
2017-06-04 20:03:37 +00:00
if ( $highest_priority == PRIORITY_NEGLIGIBLE ) {
return false ;
}
2016-10-10 21:01:36 +00:00
$high = 0 ;
2017-06-04 20:03:37 +00:00
foreach ( $priorities AS $priority ) {
if ( $priority == $highest_priority ) {
2016-10-10 21:01:36 +00:00
++ $high ;
2017-06-04 20:03:37 +00:00
}
}
2016-10-10 21:01:36 +00:00
logger ( " Highest priority: " . $highest_priority . " Total processes: " . count ( $priorities ) . " Count high priority processes: " . $high , LOGGER_DEBUG );
$passing_slow = (( $high / count ( $priorities )) > ( 2 / 3 ));
2017-06-04 20:03:37 +00:00
if ( $passing_slow ) {
2016-10-10 21:01:36 +00:00
logger ( " Passing slower processes than priority " . $highest_priority , LOGGER_DEBUG );
2017-06-04 20:03:37 +00:00
}
return $passing_slow ;
2016-10-10 21:01:36 +00:00
}
/**
2017-06-12 21:39:20 +00:00
* @ brief Find and claim the next worker process for us
2016-10-10 21:01:36 +00:00
*
2017-06-12 21:39:20 +00:00
* @ return boolean Have we found something ?
2016-10-10 21:01:36 +00:00
*/
2017-06-12 21:39:20 +00:00
function find_worker_processes () {
2016-10-10 21:01:36 +00:00
// Check if we should pass some low priority process
$highest_priority = 0 ;
2017-06-12 21:39:20 +00:00
$found = false ;
2017-06-28 04:53:11 +00:00
// The higher the number of parallel workers, the more we prefetch to prevent concurring access
$limit = Config :: get ( " system " , " worker_queues " , 4 ) * 2 ;
$limit = Config :: get ( 'system' , 'worker_fetch_limit' , $limit );
2016-10-10 21:01:36 +00:00
2017-06-12 14:19:47 +00:00
if ( poller_passing_slow ( $highest_priority )) {
2016-10-10 21:01:36 +00:00
// Are there waiting processes with a higher priority than the currently highest?
2017-06-13 21:56:50 +00:00
$result = dba :: e ( " UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
2017-06-17 13:58:18 +00:00
WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
ORDER BY `priority` , `created` LIMIT " .intval( $limit ),
2017-06-12 21:39:20 +00:00
datetime_convert (), getmypid (), NULL_DATE , $highest_priority );
2017-06-13 21:56:50 +00:00
if ( $result ) {
$found = ( dba :: affected_rows () > 0 );
2017-03-19 08:04:04 +00:00
}
2017-06-12 21:39:20 +00:00
if ( ! $found ) {
// Give slower processes some processing time
2017-06-13 21:56:50 +00:00
$result = dba :: e ( " UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
2017-06-17 13:58:18 +00:00
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
2017-06-28 04:53:11 +00:00
ORDER BY `priority` , `created` LIMIT " .intval( $limit ),
2017-06-12 21:39:20 +00:00
datetime_convert (), getmypid (), NULL_DATE , $highest_priority );
2017-06-13 21:56:50 +00:00
if ( $result ) {
$found = ( dba :: affected_rows () > 0 );
2017-06-12 21:39:20 +00:00
}
2017-05-30 13:20:29 +00:00
}
2016-10-10 21:01:36 +00:00
}
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
2017-06-12 21:39:20 +00:00
if ( ! $found ) {
2017-06-17 13:58:18 +00:00
$result = dba :: e ( " UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT " . intval ( $limit ),
2017-06-12 21:39:20 +00:00
datetime_convert (), getmypid (), NULL_DATE );
2017-06-13 21:56:50 +00:00
if ( $result ) {
$found = ( dba :: affected_rows () > 0 );
2017-06-12 21:39:20 +00:00
}
2017-06-03 07:25:01 +00:00
}
2017-06-12 21:39:20 +00:00
return $found ;
2016-10-10 21:01:36 +00:00
}
2017-05-30 13:20:29 +00:00
/**
2017-06-12 21:39:20 +00:00
* @ brief Returns the next worker process
2017-05-30 13:20:29 +00:00
*
2017-06-12 21:39:20 +00:00
* @ return string SQL statement
2017-05-30 13:20:29 +00:00
*/
2017-06-12 21:39:20 +00:00
function poller_worker_process () {
2017-06-28 04:53:11 +00:00
global $poller_db_duration , $poller_lock_duration ;
2017-05-30 13:20:29 +00:00
2017-06-13 09:03:19 +00:00
$stamp = ( float ) microtime ( true );
2017-06-28 04:53:11 +00:00
// There can already be jobs for us in the queue.
$r = q ( " SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done` " , intval ( getmypid ()));
if ( dbm :: is_result ( $r )) {
$poller_db_duration += ( microtime ( true ) - $stamp );
return $r ;
}
2017-06-03 07:25:01 +00:00
2017-06-28 04:53:11 +00:00
$stamp = ( float ) microtime ( true );
if ( ! Lock :: set ( 'poller_worker_process' )) {
return false ;
}
$poller_lock_duration = ( microtime ( true ) - $stamp );
$stamp = ( float ) microtime ( true );
$found = find_worker_processes ();
2017-06-17 13:58:18 +00:00
$poller_db_duration += ( microtime ( true ) - $stamp );
2017-06-13 09:03:19 +00:00
2017-06-28 04:53:11 +00:00
Lock :: remove ( 'poller_worker_process' );
2017-06-12 21:39:20 +00:00
if ( $found ) {
2017-06-21 03:49:05 +00:00
$r = q ( " SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done` " , intval ( getmypid ()));
2017-05-30 13:20:29 +00:00
}
2017-06-12 21:39:20 +00:00
return $r ;
2017-05-30 13:20:29 +00:00
}
/**
* @ brief Removes a workerqueue entry from the current process
*/
function poller_unclaim_process () {
$mypid = getmypid ();
2017-06-28 04:53:11 +00:00
dba :: update ( 'workerqueue' , array ( 'executed' => NULL_DATE , 'pid' => 0 ), array ( 'pid' => $mypid , 'done' => false ));
2017-05-30 13:20:29 +00:00
}
2016-11-27 01:11:22 +00:00
/**
* @ brief Call the front end worker
*/
2016-11-27 00:55:05 +00:00
function call_worker () {
2017-02-26 23:16:49 +00:00
if ( ! Config :: get ( " system " , " frontend_worker " )) {
2016-11-27 00:55:05 +00:00
return ;
}
2016-12-20 20:51:25 +00:00
$url = App :: get_baseurl () . " /worker " ;
2016-11-27 00:55:05 +00:00
fetch_url ( $url , false , $redirects , 1 );
}
2016-11-27 01:11:22 +00:00
/**
* @ brief Call the front end worker if there aren ' t any active
*/
2016-11-27 00:55:05 +00:00
function call_worker_if_idle () {
2017-02-26 23:16:49 +00:00
if ( ! Config :: get ( " system " , " frontend_worker " )) {
2016-11-27 00:55:05 +00:00
return ;
}
2016-11-29 22:40:19 +00:00
// Do we have "proc_open"? Then we can fork the poller
if ( function_exists ( " proc_open " )) {
2016-11-30 19:24:58 +00:00
// When was the last time that we called the worker?
// Less than one minute? Then we quit
2017-01-21 06:06:29 +00:00
if (( time () - Config :: get ( " system " , " worker_started " )) < 60 ) {
2016-11-29 22:40:19 +00:00
return ;
}
2016-11-30 19:24:58 +00:00
set_config ( " system " , " worker_started " , time ());
// Do we have enough running workers? Then we quit here.
if ( poller_too_much_workers ()) {
// Cleaning dead processes
poller_kill_stale_workers ();
get_app () -> remove_inactive_processes ();
2016-11-29 22:40:19 +00:00
return ;
}
2016-11-30 19:24:58 +00:00
poller_run_cron ();
logger ( 'Call poller' , LOGGER_DEBUG );
2017-02-27 06:27:22 +00:00
$args = array ( " include/poller.php " , " no_cron " );
2016-11-30 19:24:58 +00:00
$a = get_app ();
$a -> proc_run ( $args );
return ;
2016-11-29 22:40:19 +00:00
}
// We cannot execute background processes.
// We now run the processes from the frontend.
// This won't work with long running processes.
2016-11-27 00:55:05 +00:00
poller_run_cron ();
clear_worker_processes ();
$workers = q ( " SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php' " );
if ( $workers [ 0 ][ " processes " ] == 0 ) {
call_worker ();
}
}
2016-11-27 01:11:22 +00:00
/**
* @ brief Removes long running worker processes
*/
2016-11-27 00:55:05 +00:00
function clear_worker_processes () {
2016-11-27 09:02:08 +00:00
$timeout = Config :: get ( " system " , " frontend_worker_timeout " , 10 );
/// @todo We should clean up the corresponding workerqueue entries as well
2016-11-27 00:55:05 +00:00
q ( " DELETE FROM `process` WHERE `created` < '%s' AND `command` = 'worker.php' " ,
2016-11-27 09:02:08 +00:00
dbesc ( datetime_convert ( 'UTC' , 'UTC' , " now - " . $timeout . " minutes " )));
2016-11-27 00:55:05 +00:00
}
2016-11-27 01:11:22 +00:00
/**
* @ brief Runs the cron processes
*/
2016-11-27 00:55:05 +00:00
function poller_run_cron () {
2016-11-30 19:24:58 +00:00
logger ( 'Add cron entries' , LOGGER_DEBUG );
2016-12-01 20:53:18 +00:00
// Check for spooled items
proc_run ( PRIORITY_HIGH , " include/spool_post.php " );
2016-11-27 00:55:05 +00:00
// Run the cron job that calls all other jobs
proc_run ( PRIORITY_MEDIUM , " include/cron.php " );
// Run the cronhooks job separately from cron for being able to use a different timing
proc_run ( PRIORITY_MEDIUM , " include/cronhooks.php " );
// Cleaning dead processes
poller_kill_stale_workers ();
}
2011-01-28 13:04:18 +00:00
if ( array_search ( __file__ , get_included_files ()) === 0 ){
2016-09-09 20:55:49 +00:00
poller_run ( $_SERVER [ " argv " ], $_SERVER [ " argc " ]);
2017-05-30 13:20:29 +00:00
poller_unclaim_process ();
2016-09-09 20:55:49 +00:00
get_app () -> end_process ();
killme ();
2011-01-28 13:04:18 +00:00
}