Hourly process pending queue entries
This commit is contained in:
parent
1d13574225
commit
4a1fb1da12
2 changed files with 40 additions and 4 deletions
|
@ -31,6 +31,16 @@ use Friendica\Util\DateTimeFormat;
|
||||||
*/
|
*/
|
||||||
class Queue
|
class Queue
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Add activity to the queue
|
||||||
|
*
|
||||||
|
* @param array $activity
|
||||||
|
* @param string $type
|
||||||
|
* @param integer $uid
|
||||||
|
* @param string $http_signer
|
||||||
|
* @param boolean $push
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
|
public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
|
||||||
{
|
{
|
||||||
$fields = [
|
$fields = [
|
||||||
|
@ -65,23 +75,33 @@ class Queue
|
||||||
return $activity;
|
return $activity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove activity from the queue
|
||||||
|
*
|
||||||
|
* @param array $activity
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public static function remove(array $activity = [])
|
public static function remove(array $activity = [])
|
||||||
{
|
{
|
||||||
if (empty($activity['entry-id'])) {
|
if (empty($activity['entry-id'])) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
|
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
|
||||||
//echo "Delete ".$activity['entry-id']."\n";
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the activity with the given id
|
||||||
|
*
|
||||||
|
* @param integer $id
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public static function process(int $id)
|
public static function process(int $id)
|
||||||
{
|
{
|
||||||
$entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
|
$entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
|
||||||
if (empty($entry)) {
|
if (empty($entry)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
|
Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
|
||||||
|
|
||||||
$activity = json_decode($entry['activity'], true);
|
$activity = json_decode($entry['activity'], true);
|
||||||
|
@ -95,15 +115,27 @@ class Queue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process all activities
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public static function processAll()
|
public static function processAll()
|
||||||
{
|
{
|
||||||
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
|
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
|
||||||
while ($entry = DBA::fetch($entries)) {
|
while ($entry = DBA::fetch($entries)) {
|
||||||
echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n";
|
|
||||||
self::process($entry['id']);
|
self::process($entry['id']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DBA::delete('inbox-entry', ["`received` < ?", DateTimeFormat::utc('now - 1 days')]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process all activities that are children of a given post url
|
||||||
|
*
|
||||||
|
* @param string $uri
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
public static function processReplyByUri(string $uri)
|
public static function processReplyByUri(string $uri)
|
||||||
{
|
{
|
||||||
$entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]);
|
$entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]);
|
||||||
|
|
|
@ -27,6 +27,7 @@ use Friendica\Core\Worker;
|
||||||
use Friendica\Database\DBA;
|
use Friendica\Database\DBA;
|
||||||
use Friendica\DI;
|
use Friendica\DI;
|
||||||
use Friendica\Model\Tag;
|
use Friendica\Model\Tag;
|
||||||
|
use Friendica\Protocol\ActivityPub\Queue;
|
||||||
use Friendica\Protocol\Relay;
|
use Friendica\Protocol\Relay;
|
||||||
|
|
||||||
class Cron
|
class Cron
|
||||||
|
@ -88,6 +89,9 @@ class Cron
|
||||||
Tag::setLocalTrendingHashtags(24, 20);
|
Tag::setLocalTrendingHashtags(24, 20);
|
||||||
Tag::setGlobalTrendingHashtags(24, 20);
|
Tag::setGlobalTrendingHashtags(24, 20);
|
||||||
|
|
||||||
|
// Process pending posts in the queue
|
||||||
|
Queue::processAll();
|
||||||
|
|
||||||
// Search for new contacts in the directory
|
// Search for new contacts in the directory
|
||||||
if (DI::config()->get('system', 'synchronize_directory')) {
|
if (DI::config()->get('system', 'synchronize_directory')) {
|
||||||
Worker::add(PRIORITY_LOW, 'PullDirectory');
|
Worker::add(PRIORITY_LOW, 'PullDirectory');
|
||||||
|
|
Loading…
Reference in a new issue