Improved asynchronous message procession
This commit is contained in:
parent
c88b1c0197
commit
a1947d2bb1
8 changed files with 46 additions and 38 deletions
|
@ -635,7 +635,7 @@ class Item
|
||||||
public function addSharedPost(array $item, string $body = ''): string
|
public function addSharedPost(array $item, string $body = ''): string
|
||||||
{
|
{
|
||||||
if (empty($body)) {
|
if (empty($body)) {
|
||||||
$body = $item['body'];
|
$body = $item['body'] ?? '';
|
||||||
}
|
}
|
||||||
|
|
||||||
if (empty($item['quote-uri-id']) || ($item['quote-uri-id'] == $item['uri-id'])) {
|
if (empty($item['quote-uri-id']) || ($item['quote-uri-id'] == $item['uri-id'])) {
|
||||||
|
|
|
@ -3905,11 +3905,12 @@ class Item
|
||||||
* Fetches item for given URI or plink
|
* Fetches item for given URI or plink
|
||||||
*
|
*
|
||||||
* @param string $uri
|
* @param string $uri
|
||||||
* @param integer $uid
|
* @param int $uid
|
||||||
|
* @param int $completion
|
||||||
*
|
*
|
||||||
* @return integer item id
|
* @return integer item id
|
||||||
*/
|
*/
|
||||||
public static function fetchByLink(string $uri, int $uid = 0): int
|
public static function fetchByLink(string $uri, int $uid = 0, int $completion = ActivityPub\Receiver::COMPLETION_MANUAL): int
|
||||||
{
|
{
|
||||||
Logger::info('Trying to fetch link', ['uid' => $uid, 'uri' => $uri]);
|
Logger::info('Trying to fetch link', ['uid' => $uid, 'uri' => $uri]);
|
||||||
$item_id = self::searchByLink($uri, $uid);
|
$item_id = self::searchByLink($uri, $uid);
|
||||||
|
@ -3930,7 +3931,7 @@ class Item
|
||||||
return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
|
return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
$fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri, [], '', ActivityPub\Receiver::COMPLETION_MANUAL, $uid);
|
$fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri, [], '', $completion, $uid);
|
||||||
|
|
||||||
if ($fetched_uri) {
|
if ($fetched_uri) {
|
||||||
$item_id = self::searchByLink($fetched_uri, $uid);
|
$item_id = self::searchByLink($fetched_uri, $uid);
|
||||||
|
@ -3990,7 +3991,7 @@ class Item
|
||||||
}
|
}
|
||||||
|
|
||||||
$url = $shared['message_id'] ?: $shared['link'];
|
$url = $shared['message_id'] ?: $shared['link'];
|
||||||
$id = self::fetchByLink($url);
|
$id = self::fetchByLink($url, 0, ActivityPub\Receiver::COMPLETION_ASYNC);
|
||||||
if (!$id) {
|
if (!$id) {
|
||||||
Logger::notice('Post could not be fetched.', ['url' => $url, 'uid' => $uid]);
|
Logger::notice('Post could not be fetched.', ['url' => $url, 'uid' => $uid]);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -35,6 +35,7 @@ use Friendica\Model\Photo;
|
||||||
use Friendica\Model\Post;
|
use Friendica\Model\Post;
|
||||||
use Friendica\Network\HTTPClient\Client\HttpClientAccept;
|
use Friendica\Network\HTTPClient\Client\HttpClientAccept;
|
||||||
use Friendica\Network\HTTPClient\Client\HttpClientOptions;
|
use Friendica\Network\HTTPClient\Client\HttpClientOptions;
|
||||||
|
use Friendica\Protocol\ActivityPub;
|
||||||
use Friendica\Util\Images;
|
use Friendica\Util\Images;
|
||||||
use Friendica\Util\Network;
|
use Friendica\Util\Network;
|
||||||
use Friendica\Util\ParseUrl;
|
use Friendica\Util\ParseUrl;
|
||||||
|
@ -253,7 +254,7 @@ class Media
|
||||||
*/
|
*/
|
||||||
private static function addActivity(array $media): array
|
private static function addActivity(array $media): array
|
||||||
{
|
{
|
||||||
$id = Item::fetchByLink($media['url']);
|
$id = Item::fetchByLink($media['url'], 0, ActivityPub\Receiver::COMPLETION_ASYNC);
|
||||||
if (empty($id)) {
|
if (empty($id)) {
|
||||||
return $media;
|
return $media;
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ class ClientToServer
|
||||||
*/
|
*/
|
||||||
private static function updateContent(int $uid, string $object_id, array $application, array $ldactivity): array
|
private static function updateContent(int $uid, string $object_id, array $application, array $ldactivity): array
|
||||||
{
|
{
|
||||||
$id = Item::fetchByLink($object_id, $uid);
|
$id = Item::fetchByLink($object_id, $uid, ActivityPub\Receiver::COMPLETION_ASYNC);
|
||||||
$original_post = Post::selectFirst(['uri-id'], ['uid' => $uid, 'origin' => true, 'id' => $id]);
|
$original_post = Post::selectFirst(['uri-id'], ['uid' => $uid, 'origin' => true, 'id' => $id]);
|
||||||
if (empty($original_post)) {
|
if (empty($original_post)) {
|
||||||
Logger::debug('Item not found or does not belong to the user', ['id' => $id, 'uid' => $uid, 'object_id' => $object_id, 'activity' => $ldactivity]);
|
Logger::debug('Item not found or does not belong to the user', ['id' => $id, 'uid' => $uid, 'object_id' => $object_id, 'activity' => $ldactivity]);
|
||||||
|
|
|
@ -348,13 +348,9 @@ class Processor
|
||||||
|
|
||||||
if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
|
if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
|
||||||
$result = self::fetchParent($activity, !empty($conversation));
|
$result = self::fetchParent($activity, !empty($conversation));
|
||||||
if (!empty($result)) {
|
if (!empty($result) && ($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
|
||||||
if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
|
|
||||||
$item['thr-parent'] = $result;
|
$item['thr-parent'] = $result;
|
||||||
}
|
}
|
||||||
} elseif (empty($conversation)) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? '';
|
$item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? '';
|
||||||
|
@ -532,39 +528,35 @@ class Processor
|
||||||
|
|
||||||
self::addActivityId($activity['reply-to-id']);
|
self::addActivityId($activity['reply-to-id']);
|
||||||
|
|
||||||
if (!DI::config()->get('system', 'fetch_by_worker')) {
|
$completion = $activity['completion-mode'] ?? Receiver::COMPLETION_NONE;
|
||||||
$in_background = false;
|
|
||||||
|
if (DI::config()->get('system', 'decoupled_receiver') && ($completion != Receiver::COMPLETION_MANUAL)) {
|
||||||
|
$in_background = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
$recursion_depth = $activity['recursion-depth'] ?? 0;
|
$recursion_depth = $activity['recursion-depth'] ?? 0;
|
||||||
|
|
||||||
if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) {
|
if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) {
|
||||||
Logger::info('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
Logger::info('Parent not found. Try to refetch it.', ['completion' => $completion, 'recursion-depth' => $recursion_depth, 'parent' => $activity['reply-to-id']]);
|
||||||
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
||||||
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
|
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
|
||||||
Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
|
Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
|
||||||
if (!empty($activity['entry-id'])) {
|
if (!empty($activity['entry-id'])) {
|
||||||
Queue::deleteById($activity['entry-id']);
|
Queue::deleteById($activity['entry-id']);
|
||||||
}
|
}
|
||||||
return '';
|
|
||||||
} elseif (!empty($result)) {
|
} elseif (!empty($result)) {
|
||||||
$exists = Post::exists(['uri' => [$result, $activity['reply-to-id']]]);
|
$post = Post::selectFirstPost(['uri'], ['uri' => [$result, $activity['reply-to-id']]]);
|
||||||
if ($exists) {
|
if (!empty($post['uri'])) {
|
||||||
Logger::info('The activity has been fetched and created.', ['parent' => $result]);
|
Logger::info('The activity has been fetched and created.', ['result' => $result, 'uri' => $post['uri']]);
|
||||||
return $result;
|
return $post['uri'];
|
||||||
} elseif (DI::config()->get('system', 'fetch_by_worker') || DI::config()->get('system', 'decoupled_receiver')) {
|
|
||||||
Logger::info('The activity has been fetched and will hopefully be created later.', ['parent' => $result]);
|
|
||||||
} else {
|
} else {
|
||||||
Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]);
|
Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]);
|
||||||
if (!empty($activity['entry-id'])) {
|
if (!empty($activity['entry-id'])) {
|
||||||
Queue::deleteById($activity['entry-id']);
|
Queue::deleteById($activity['entry-id']);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return '';
|
|
||||||
}
|
}
|
||||||
if (empty($result) && !DI::config()->get('system', 'fetch_by_worker')) {
|
|
||||||
return '';
|
return '';
|
||||||
}
|
|
||||||
} elseif (self::isActivityGone($activity['reply-to-id'])) {
|
} elseif (self::isActivityGone($activity['reply-to-id'])) {
|
||||||
Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
|
Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
|
||||||
if ($in_background) {
|
if ($in_background) {
|
||||||
|
@ -586,7 +578,7 @@ class Processor
|
||||||
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||||
Fetch::add($activity['reply-to-id']);
|
Fetch::add($activity['reply-to-id']);
|
||||||
$activity['recursion-depth'] = 0;
|
$activity['recursion-depth'] = 0;
|
||||||
$wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
$wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC);
|
||||||
Fetch::setWorkerId($activity['reply-to-id'], $wid);
|
Fetch::setWorkerId($activity['reply-to-id'], $wid);
|
||||||
} else {
|
} else {
|
||||||
Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]);
|
Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]);
|
||||||
|
@ -867,7 +859,7 @@ class Processor
|
||||||
$content = self::addMentionLinks($content, $activity['tags']);
|
$content = self::addMentionLinks($content, $activity['tags']);
|
||||||
|
|
||||||
if (!empty($activity['quote-url'])) {
|
if (!empty($activity['quote-url'])) {
|
||||||
$id = Item::fetchByLink($activity['quote-url']);
|
$id = Item::fetchByLink($activity['quote-url'], 0, ActivityPub\Receiver::COMPLETION_ASYNC);
|
||||||
if ($id) {
|
if ($id) {
|
||||||
$shared_item = Post::selectFirst(['uri-id'], ['id' => $id]);
|
$shared_item = Post::selectFirst(['uri-id'], ['id' => $id]);
|
||||||
$item['quote-uri-id'] = $shared_item['uri-id'];
|
$item['quote-uri-id'] = $shared_item['uri-id'];
|
||||||
|
@ -1456,7 +1448,7 @@ class Processor
|
||||||
if (empty($post['id'])) {
|
if (empty($post['id'])) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
$id = Item::fetchByLink($post['id']);
|
$id = Item::fetchByLink($post['id'], 0, ActivityPub\Receiver::COMPLETION_ASYNC);
|
||||||
if (!empty($id)) {
|
if (!empty($id)) {
|
||||||
$item = Post::selectFirst(['uri-id', 'featured', 'author-id'], ['id' => $id]);
|
$item = Post::selectFirst(['uri-id', 'featured', 'author-id'], ['id' => $id]);
|
||||||
if (!empty($item['uri-id'])) {
|
if (!empty($item['uri-id'])) {
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
namespace Friendica\Protocol\ActivityPub;
|
namespace Friendica\Protocol\ActivityPub;
|
||||||
|
|
||||||
use Friendica\Core\Logger;
|
use Friendica\Core\Logger;
|
||||||
|
use Friendica\Core\Worker;
|
||||||
use Friendica\Database\Database;
|
use Friendica\Database\Database;
|
||||||
use Friendica\Database\DBA;
|
use Friendica\Database\DBA;
|
||||||
use Friendica\DI;
|
use Friendica\DI;
|
||||||
|
@ -285,10 +286,26 @@ class Queue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
|
if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id'])) {
|
||||||
|
if (DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
|
||||||
// This entry belongs to some other entry that should be processed first
|
// This entry belongs to some other entry that should be processed first
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (!Post::exists(['uri' => $entry['in-reply-to-id']])) {
|
||||||
|
// This entry belongs to some other entry that need to be fetched first
|
||||||
|
if (Fetch::hasWorker($entry['in-reply-to-id'])) {
|
||||||
|
Logger::debug('Fetching of the activity is already queued', ['id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Fetch::add($entry['in-reply-to-id']);
|
||||||
|
$activity = json_decode($entry['activity'], true);
|
||||||
|
$activity['recursion-depth'] = 0;
|
||||||
|
$wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $entry['in-reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC);
|
||||||
|
Fetch::setWorkerId($entry['in-reply-to-id'], $wid);
|
||||||
|
Logger::debug('Fetch missing activity', ['wid' => $wid, 'id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,7 @@ class Receiver
|
||||||
const COMPLETION_RELAY = 2;
|
const COMPLETION_RELAY = 2;
|
||||||
const COMPLETION_MANUAL = 3;
|
const COMPLETION_MANUAL = 3;
|
||||||
const COMPLETION_AUTO = 4;
|
const COMPLETION_AUTO = 4;
|
||||||
|
const COMPLETION_ASYNC = 5;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks incoming message from the inbox
|
* Checks incoming message from the inbox
|
||||||
|
@ -681,7 +682,7 @@ class Receiver
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($object_data['entry-id']) && $decouple && ($push || ($completion == self::COMPLETION_RELAY))) {
|
if (!empty($object_data['entry-id']) && $decouple && ($push || in_array($completion, [self::COMPLETION_RELAY, self::COMPLETION_ASYNC]))) {
|
||||||
if (Queue::isProcessable($object_data['entry-id'])) {
|
if (Queue::isProcessable($object_data['entry-id'])) {
|
||||||
// We delay by 5 seconds to allow to accumulate all receivers
|
// We delay by 5 seconds to allow to accumulate all receivers
|
||||||
$delayed = date(DateTimeFormat::MYSQL, time() + 5);
|
$delayed = date(DateTimeFormat::MYSQL, time() + 5);
|
||||||
|
|
|
@ -298,10 +298,6 @@ return [
|
||||||
// Priority for the expiry notification
|
// Priority for the expiry notification
|
||||||
'expire-notify-priority' => Friendica\Core\Worker::PRIORITY_LOW,
|
'expire-notify-priority' => Friendica\Core\Worker::PRIORITY_LOW,
|
||||||
|
|
||||||
// fetch_by_worker (Boolean)
|
|
||||||
// Fetch missing posts via a background process
|
|
||||||
'fetch_by_worker' => false,
|
|
||||||
|
|
||||||
// fetch_featured_posts (Boolean)
|
// fetch_featured_posts (Boolean)
|
||||||
// Fetch featured posts from all contacts
|
// Fetch featured posts from all contacts
|
||||||
'fetch_featured_posts' => false,
|
'fetch_featured_posts' => false,
|
||||||
|
|
Loading…
Reference in a new issue