Delete IPC entries

This commit is contained in:
Michael 2021-01-05 16:01:05 +00:00
parent 7e89bf5af8
commit 17fbe5c299
1 changed files with 23 additions and 9 deletions

View File

@ -98,8 +98,9 @@ class Worker
// We fetch the next queue entry that is about to be executed // We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) { while ($r = self::workerProcess()) {
if (self::IPCJobsExists(getmypid())) { if (self::IPCJobsExists(getmypid())) {
self::IPCSetJobState(false, getmypid()); self::IPCDeleteJobState(getmypid());
} }
// Don't refetch when a worker fetches tasks for multiple workers // Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch'); $refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) { foreach ($r as $entry) {
@ -1217,8 +1218,9 @@ class Worker
} elseif ($pid) { } elseif ($pid) {
// The parent process continues here // The parent process continues here
DBA::connect(); DBA::connect();
Logger::info('Spawned new worker', ['pid' => $pid]);
self::IPCSetJobState(true, $pid); self::IPCSetJobState(true, $pid);
Logger::info('Spawned new worker', ['pid' => $pid]);
$cycles = 0; $cycles = 0;
while (self::IPCJobsExists($pid) && (++$cycles < 100)) { while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
@ -1231,15 +1233,11 @@ class Worker
// We now are in the new worker // We now are in the new worker
DBA::connect(); DBA::connect();
/// @todo Reinitialize the logger to set a new process_id and uid
self::IPCSetJobState(true, getmypid());
Logger::info('Worker spawned', ['pid' => getmypid()]); Logger::info('Worker spawned', ['pid' => getmypid()]);
$cycles = 0;
while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
usleep(10000);
}
Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]);
self::processQueue($do_cron); self::processQueue($do_cron);
self::unclaimProcess(); self::unclaimProcess();
@ -1477,6 +1475,7 @@ class Worker
* Set the flag if some job is waiting * Set the flag if some job is waiting
* *
* @param boolean $jobs Is there a waiting job? * @param boolean $jobs Is there a waiting job?
* @param int $key Key number
* @throws \Exception * @throws \Exception
*/ */
public static function IPCSetJobState(bool $jobs, int $key = 0) public static function IPCSetJobState(bool $jobs, int $key = 0)
@ -1487,9 +1486,24 @@ class Worker
self::$db_duration_write += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp);
} }
/**
* Delete a key entry
*
* @param int $key Key number
* @throws \Exception
*/
public static function IPCDeleteJobState(int $key)
{
$stamp = (float)microtime(true);
DBA::delete('worker-ipc', ['key' => $key]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
}
/** /**
* Checks if some worker job waits to be executed * Checks if some worker job waits to be executed
* *
* @param int $key Key number
* @return bool * @return bool
* @throws \Exception * @throws \Exception
*/ */