Merge pull request #11827 from annando/loop-prevention
Improvements for message processing
This commit is contained in:
commit
bbbbf8fd6b
12 changed files with 308 additions and 128 deletions
20
database.sql
20
database.sql
|
@ -1,6 +1,6 @@
|
|||
-- ------------------------------------------
|
||||
-- Friendica 2022.09-dev (Giant Rhubarb)
|
||||
-- DB_UPDATE_VERSION 1477
|
||||
-- DB_UPDATE_VERSION 1478
|
||||
-- ------------------------------------------
|
||||
|
||||
|
||||
|
@ -1716,6 +1716,24 @@ CREATE TABLE IF NOT EXISTS `user-contact` (
|
|||
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
|
||||
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='User specific public contact data';
|
||||
|
||||
--
|
||||
-- TABLE arrived-activity
|
||||
--
|
||||
CREATE TABLE IF NOT EXISTS `arrived-activity` (
|
||||
`object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
|
||||
`received` datetime COMMENT 'Receiving date',
|
||||
PRIMARY KEY(`object-id`)
|
||||
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities';
|
||||
|
||||
--
|
||||
-- TABLE processed-activity
|
||||
--
|
||||
CREATE TABLE IF NOT EXISTS `processed-activity` (
|
||||
`object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
|
||||
`received` datetime COMMENT 'Receiving date',
|
||||
PRIMARY KEY(`object-id`)
|
||||
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities';
|
||||
|
||||
--
|
||||
-- TABLE worker-ipc
|
||||
--
|
||||
|
|
|
@ -13,6 +13,7 @@ Database Tables
|
|||
| [application](help/database/db_application) | OAuth application |
|
||||
| [application-marker](help/database/db_application-marker) | Timeline marker |
|
||||
| [application-token](help/database/db_application-token) | OAuth user token |
|
||||
| [arrived-activity](help/database/db_arrived-activity) | Id of arrived activities |
|
||||
| [attach](help/database/db_attach) | file attachments |
|
||||
| [cache](help/database/db_cache) | Stores temporary data |
|
||||
| [config](help/database/db_config) | main configuration storage |
|
||||
|
@ -67,6 +68,7 @@ Database Tables
|
|||
| [post-user](help/database/db_post-user) | User specific post data |
|
||||
| [post-user-notification](help/database/db_post-user-notification) | User post notifications |
|
||||
| [process](help/database/db_process) | Currently running system processes |
|
||||
| [processed-activity](help/database/db_processed-activity) | Id of processed activities |
|
||||
| [profile](help/database/db_profile) | user profiles data |
|
||||
| [profile_field](help/database/db_profile_field) | Custom profile fields |
|
||||
| [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers |
|
||||
|
|
22
doc/database/db_arrived-activity.md
Normal file
22
doc/database/db_arrived-activity.md
Normal file
|
@ -0,0 +1,22 @@
|
|||
Table arrived-activity
|
||||
===========
|
||||
|
||||
Id of arrived activities
|
||||
|
||||
Fields
|
||||
------
|
||||
|
||||
| Field | Description | Type | Null | Key | Default | Extra |
|
||||
| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
|
||||
| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
|
||||
| received | Receiving date | datetime | YES | | NULL | |
|
||||
|
||||
Indexes
|
||||
------------
|
||||
|
||||
| Name | Fields |
|
||||
| ------- | --------- |
|
||||
| PRIMARY | object-id |
|
||||
|
||||
|
||||
Return to [database documentation](help/database)
|
22
doc/database/db_processed-activity.md
Normal file
22
doc/database/db_processed-activity.md
Normal file
|
@ -0,0 +1,22 @@
|
|||
Table processed-activity
|
||||
===========
|
||||
|
||||
Id of processed activities
|
||||
|
||||
Fields
|
||||
------
|
||||
|
||||
| Field | Description | Type | Null | Key | Default | Extra |
|
||||
| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
|
||||
| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
|
||||
| received | Receiving date | datetime | YES | | NULL | |
|
||||
|
||||
Indexes
|
||||
------------
|
||||
|
||||
| Name | Fields |
|
||||
| ------- | --------- |
|
||||
| PRIMARY | object-id |
|
||||
|
||||
|
||||
Return to [database documentation](help/database)
|
|
@ -905,14 +905,7 @@ class Item
|
|||
'photo' => $item['owner-avatar'], 'network' => $item['network']];
|
||||
$item['owner-id'] = ($item['owner-id'] ?? 0) ?: Contact::getIdForURL($item['owner-link'], 0, null, $default);
|
||||
|
||||
$actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id'];
|
||||
if (!$item['origin'] && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) {
|
||||
$item['post-reason'] = self::PR_FOLLOWER;
|
||||
}
|
||||
|
||||
if ($item['origin'] && empty($item['post-reason'])) {
|
||||
$item['post-reason'] = self::PR_LOCAL;
|
||||
}
|
||||
$item['post-reason'] = self::getPostReason($item);
|
||||
|
||||
// Ensure that there is an avatar cache
|
||||
Contact::checkAvatarCache($item['author-id']);
|
||||
|
@ -1291,6 +1284,27 @@ class Item
|
|||
return $post_user_id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the post reason for a given item array
|
||||
*
|
||||
* @param array $item
|
||||
*
|
||||
* @return integer
|
||||
*/
|
||||
public static function getPostReason(array $item): int
|
||||
{
|
||||
$actor = ($item['gravity'] == GRAVITY_PARENT) ? $item['owner-id'] : $item['author-id'];
|
||||
if (empty($item['origin']) && ($item['uid'] != 0) && Contact::isSharing($actor, $item['uid'])) {
|
||||
return self::PR_FOLLOWER;
|
||||
}
|
||||
|
||||
if (!empty($item['origin']) && empty($item['post-reason'])) {
|
||||
return self::PR_LOCAL;
|
||||
}
|
||||
|
||||
return $item['post-reason'] ?? self::PR_NONE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the display cache
|
||||
*
|
||||
|
@ -1495,12 +1509,13 @@ class Item
|
|||
|
||||
$item = array_merge($item, $fields);
|
||||
|
||||
$item['post-reason'] = self::getPostReason($item);
|
||||
|
||||
$is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE);
|
||||
|
||||
if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
|
||||
DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE &&
|
||||
!Contact::isSharingByURL($item['author-link'], $uid) &&
|
||||
!Contact::isSharingByURL($item['owner-link'], $uid)) {
|
||||
!in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) {
|
||||
Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -60,10 +60,13 @@ class ItemURI
|
|||
* Searched for an id of a given uri. Adds it, if not existing yet.
|
||||
*
|
||||
* @param string $uri
|
||||
* @param bool $insert
|
||||
*
|
||||
* @return integer item-uri id
|
||||
*
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function getIdByURI(string $uri): int
|
||||
public static function getIdByURI(string $uri, bool $insert = true): int
|
||||
{
|
||||
if (empty($uri)) {
|
||||
return 0;
|
||||
|
@ -74,12 +77,13 @@ class ItemURI
|
|||
|
||||
$itemuri = DBA::selectFirst('item-uri', ['id'], ['uri' => $uri]);
|
||||
|
||||
if (!DBA::isResult($itemuri)) {
|
||||
if (!DBA::isResult($itemuri) && $insert) {
|
||||
return self::insert(['uri' => $uri]);
|
||||
}
|
||||
|
||||
return $itemuri['id'];
|
||||
return $itemuri['id'] ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Searched for an id of a given guid.
|
||||
*
|
||||
|
|
|
@ -22,15 +22,10 @@
|
|||
namespace Friendica\Module\Debug;
|
||||
|
||||
use Friendica\BaseModule;
|
||||
use Friendica\Content\Text;
|
||||
use Friendica\Core\Logger;
|
||||
use Friendica\Core\Renderer;
|
||||
use Friendica\DI;
|
||||
use Friendica\Model\Item;
|
||||
use Friendica\Model\Tag;
|
||||
use Friendica\Protocol\ActivityPub;
|
||||
use Friendica\Util\JsonLD;
|
||||
use Friendica\Util\XML;
|
||||
|
||||
class ActivityPubConversion extends BaseModule
|
||||
{
|
||||
|
@ -123,7 +118,7 @@ class ActivityPubConversion extends BaseModule
|
|||
'content' => visible_whitespace(var_export($object_data, true))
|
||||
];
|
||||
|
||||
$item = ActivityPub\Processor::createItem($object_data);
|
||||
$item = ActivityPub\Processor::createItem($object_data, true);
|
||||
|
||||
$results[] = [
|
||||
'title' => DI::l10n()->t('Result Item'),
|
||||
|
|
|
@ -60,33 +60,29 @@ class Processor
|
|||
const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:';
|
||||
const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:';
|
||||
|
||||
static $processed = [];
|
||||
|
||||
/**
|
||||
* Add an activity id to the list of processed ids
|
||||
* Add an object id to the list of processed ids
|
||||
*
|
||||
* @param string $id
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public static function addActivityId(string $id)
|
||||
private static function addActivityId(string $id)
|
||||
{
|
||||
self::$processed[] = $id;
|
||||
if (count(self::$processed) > 100) {
|
||||
self::$processed = array_slice(self::$processed, 1);
|
||||
}
|
||||
DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
|
||||
DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given has just been processed
|
||||
* Checks if the given object id has just been processed
|
||||
*
|
||||
* @param string $id
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
public static function isProcessed(string $id): bool
|
||||
private static function isProcessed(string $id): bool
|
||||
{
|
||||
return in_array($id, self::$processed);
|
||||
return DBA::exists('processed-activity', ['object-id' => $id]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -233,8 +229,9 @@ class Processor
|
|||
$item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type', 'private'], ['uri' => $activity['id']]);
|
||||
if (!DBA::isResult($item)) {
|
||||
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
|
||||
$item = self::createItem($activity);
|
||||
$item = self::createItem($activity, false);
|
||||
if (empty($item)) {
|
||||
Queue::remove($activity);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -247,6 +244,7 @@ class Processor
|
|||
|
||||
$item = self::processContent($activity, $item);
|
||||
if (empty($item)) {
|
||||
Queue::remove($activity);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -303,9 +301,9 @@ class Processor
|
|||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||
* @throws \ImagickException
|
||||
*/
|
||||
public static function createItem(array $activity, bool $fetch_parents = true): array
|
||||
public static function createItem(array $activity, bool $fetch_parents): array
|
||||
{
|
||||
if (self::isProcessed($activity['id'])) {
|
||||
if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) {
|
||||
Logger::info('Id is already processed', ['id' => $activity['id']]);
|
||||
return [];
|
||||
}
|
||||
|
@ -324,10 +322,10 @@ class Processor
|
|||
$item['object-type'] = Activity\ObjectType::COMMENT;
|
||||
}
|
||||
|
||||
if (!empty($activity['context'])) {
|
||||
$item['conversation'] = $activity['context'];
|
||||
} elseif (!empty($activity['conversation'])) {
|
||||
if (!empty($activity['conversation'])) {
|
||||
$item['conversation'] = $activity['conversation'];
|
||||
} elseif (!empty($activity['context'])) {
|
||||
$item['conversation'] = $activity['context'];
|
||||
}
|
||||
|
||||
if (!empty($item['conversation'])) {
|
||||
|
@ -340,8 +338,10 @@ class Processor
|
|||
$conversation = [];
|
||||
}
|
||||
|
||||
Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']);
|
||||
if (empty($activity['author']) && empty($activity['actor'])) {
|
||||
Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]);
|
||||
Queue::remove($activity);
|
||||
return [];
|
||||
}
|
||||
|
||||
|
@ -364,6 +364,9 @@ class Processor
|
|||
|
||||
if (empty($conversation) && empty($activity['directmessage']) && ($item['gravity'] != GRAVITY_PARENT) && !Post::exists(['uri' => $item['thr-parent']])) {
|
||||
Logger::info('Parent not found, message will be discarded.', ['thr-parent' => $item['thr-parent']]);
|
||||
if (!$fetch_parents) {
|
||||
Queue::remove($activity);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
|
@ -457,6 +460,7 @@ class Processor
|
|||
$item = self::processContent($activity, $item);
|
||||
if (empty($item)) {
|
||||
Logger::info('Message was not processed');
|
||||
Queue::remove($activity);
|
||||
return [];
|
||||
}
|
||||
|
||||
|
@ -490,11 +494,6 @@ class Processor
|
|||
*/
|
||||
private static function fetchParent(array $activity): string
|
||||
{
|
||||
if (self::hasJustBeenFetched($activity['reply-to-id'])) {
|
||||
Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]);
|
||||
return '';
|
||||
}
|
||||
|
||||
$recursion_depth = $activity['recursion-depth'] ?? 0;
|
||||
|
||||
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
|
||||
|
@ -552,23 +551,6 @@ class Processor
|
|||
return '';
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a given activity has recently been fetched
|
||||
*
|
||||
* @param string $url
|
||||
* @return boolean
|
||||
*/
|
||||
private static function hasJustBeenFetched(string $url): bool
|
||||
{
|
||||
$cachekey = self::CACHEKEY_JUST_FETCHED . $url;
|
||||
$time = DI::cache()->get($cachekey);
|
||||
if (is_null($time)) {
|
||||
DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES);
|
||||
return false;
|
||||
}
|
||||
return ($time + 300) > time();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a given activity is no longer available
|
||||
*
|
||||
|
@ -585,10 +567,11 @@ class Processor
|
|||
}
|
||||
|
||||
// @todo To ensure that the remote system is working correctly, we can check if the "Content-Type" contains JSON
|
||||
if (in_array($curlResult->getReturnCode(), [404])) {
|
||||
if (in_array($curlResult->getReturnCode(), [401, 404])) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if ($curlResult->isSuccess()) {
|
||||
$object = json_decode($curlResult->getBody(), true);
|
||||
if (!empty($object)) {
|
||||
$activity = JsonLD::compact($object);
|
||||
|
@ -596,6 +579,12 @@ class Processor
|
|||
return true;
|
||||
}
|
||||
}
|
||||
} elseif ($curlResult->getReturnCode() == 0) {
|
||||
$host = parse_url($url, PHP_URL_HOST);
|
||||
if (!(filter_var($host, FILTER_VALIDATE_IP) || @dns_get_record($host . '.', DNS_A + DNS_AAAA))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -656,7 +645,7 @@ class Processor
|
|||
public static function createActivity(array $activity, string $verb)
|
||||
{
|
||||
$activity['reply-to-id'] = $activity['object_id'];
|
||||
$item = self::createItem($activity);
|
||||
$item = self::createItem($activity, false);
|
||||
if (empty($item)) {
|
||||
return;
|
||||
}
|
||||
|
@ -839,7 +828,7 @@ class Processor
|
|||
Logger::warning('Unknown parent item.', ['uri' => $parent_uri]);
|
||||
return false;
|
||||
}
|
||||
if (($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
|
||||
if (!empty($activity['type']) && in_array($activity['type'], Receiver::CONTENT_TYPES) && ($item['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
|
||||
Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
|
||||
return false;
|
||||
}
|
||||
|
@ -969,6 +958,9 @@ class Processor
|
|||
|
||||
if (!self::isSolicitedMessage($activity, $item)) {
|
||||
DBA::delete('item-uri', ['id' => $item['uri-id']]);
|
||||
if (!empty($activity['entry-id'])) {
|
||||
Queue::deleteById($activity['entry-id']);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1006,13 +998,17 @@ class Processor
|
|||
$item['post-reason'] = Item::PR_NONE;
|
||||
}
|
||||
|
||||
$item['post-reason'] = Item::getPostReason($item);
|
||||
|
||||
if (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE])) {
|
||||
if (!empty($activity['from-relay'])) {
|
||||
$item['post-reason'] = Item::PR_RELAY;
|
||||
} elseif (!empty($activity['thread-completion'])) {
|
||||
$item['post-reason'] = Item::PR_FETCHED;
|
||||
} elseif (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE]) && !empty($activity['push'])) {
|
||||
} elseif (!empty($activity['push'])) {
|
||||
$item['post-reason'] = Item::PR_PUSHED;
|
||||
}
|
||||
}
|
||||
|
||||
if ($item['isForum'] ?? false) {
|
||||
$item['contact-id'] = Contact::getIdForURL($activity['actor'], $receiver);
|
||||
|
@ -1029,17 +1025,15 @@ class Processor
|
|||
continue;
|
||||
}
|
||||
|
||||
if (!($item['isForum'] ?? false) && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !Contact::isSharingByURL($activity['author'], $receiver)) {
|
||||
if (($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT) && !in_array($item['post-reason'], [Item::PR_FOLLOWER, Item::PR_TAG, item::PR_TO, Item::PR_CC])) {
|
||||
if (!($item['isForum'] ?? false)) {
|
||||
if ($item['post-reason'] == Item::PR_BCC) {
|
||||
Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id']]);
|
||||
Logger::info('Top level post via BCC from a non sharer, ignoring', ['uid' => $receiver, 'contact' => $item['contact-id'], 'url' => $item['uri']]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (
|
||||
!empty($activity['thread-children-type'])
|
||||
&& in_array($activity['thread-children-type'], Receiver::ACTIVITY_TYPES)
|
||||
&& DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE
|
||||
) {
|
||||
if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') != Item::COMPLETION_LIKE)
|
||||
&& in_array($activity['thread-children-type'] ?? '', Receiver::ACTIVITY_TYPES)) {
|
||||
Logger::info('Top level post from thread completion from a non sharer had been initiated via an activity, ignoring',
|
||||
['type' => $activity['thread-children-type'], 'user' => $item['uid'], 'causer' => $item['causer-link'], 'author' => $activity['author'], 'url' => $item['uri']]);
|
||||
continue;
|
||||
|
@ -1047,23 +1041,15 @@ class Processor
|
|||
}
|
||||
|
||||
$is_forum = false;
|
||||
|
||||
if ($receiver != 0) {
|
||||
$user = User::getById($receiver, ['account-type']);
|
||||
if (!empty($user['account-type'])) {
|
||||
$is_forum = ($user['account-type'] == User::ACCOUNT_TYPE_COMMUNITY);
|
||||
}
|
||||
}
|
||||
|
||||
if (!$is_forum && DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE && ($receiver != 0) && ($item['gravity'] == GRAVITY_PARENT)) {
|
||||
$skip = !Contact::isSharingByURL($activity['author'], $receiver);
|
||||
|
||||
if ($skip && (($activity['type'] == 'as:Announce') || ($item['isForum'] ?? false))) {
|
||||
$skip = !Contact::isSharingByURL($activity['actor'], $receiver);
|
||||
}
|
||||
|
||||
if ($skip) {
|
||||
Logger::info('Skipping post', ['uid' => $receiver, 'url' => $item['uri']]);
|
||||
if ((DI::pConfig()->get($receiver, 'system', 'accept_only_sharer') == Item::COMPLETION_NONE)
|
||||
&& ((!$is_forum && !($item['isForum'] ?? false) && ($activity['type'] != 'as:Announce'))
|
||||
|| !Contact::isSharingByURL($activity['actor'], $receiver))) {
|
||||
Logger::info('Actor is a non sharer, is no forum or it is no announce', ['uid' => $receiver, 'actor' => $activity['actor'], 'url' => $item['uri'], 'type' => $activity['type']]);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1358,13 +1344,7 @@ class Processor
|
|||
*/
|
||||
public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
|
||||
{
|
||||
if (!empty($child['receiver'])) {
|
||||
$uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
|
||||
} else {
|
||||
$uid = 0;
|
||||
}
|
||||
|
||||
$object = self::fetchCachedActivity($url, $uid);
|
||||
$object = self::fetchCachedActivity($url, 0);
|
||||
if (empty($object)) {
|
||||
return '';
|
||||
}
|
||||
|
@ -1419,7 +1399,7 @@ class Processor
|
|||
|
||||
$ldactivity = JsonLD::compact($activity);
|
||||
|
||||
$ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1;
|
||||
$ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0;
|
||||
|
||||
if (!empty($relay_actor)) {
|
||||
$ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);
|
||||
|
@ -1432,8 +1412,12 @@ class Processor
|
|||
$ldactivity['completion-mode'] = $completion;
|
||||
}
|
||||
|
||||
if (!empty($child['type'])) {
|
||||
if (!empty($child['thread-children-type'])) {
|
||||
$ldactivity['thread-children-type'] = $child['thread-children-type'];
|
||||
} elseif (!empty($child['type'])) {
|
||||
$ldactivity['thread-children-type'] = $child['type'];
|
||||
} else {
|
||||
$ldactivity['thread-children-type'] = 'as:Create';
|
||||
}
|
||||
|
||||
if (!empty($relay_actor) && !self::acceptIncomingMessage($ldactivity, $object['id'])) {
|
||||
|
@ -1442,7 +1426,7 @@ class Processor
|
|||
|
||||
if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) {
|
||||
Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]);
|
||||
} elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) {
|
||||
} elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), 0, true, false, $signer, '', $completion)) {
|
||||
Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
|
||||
} else {
|
||||
Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
|
||||
|
|
|
@ -26,6 +26,7 @@ use Friendica\Core\System;
|
|||
use Friendica\Database\Database;
|
||||
use Friendica\Database\DBA;
|
||||
use Friendica\DI;
|
||||
use Friendica\Model\ItemURI;
|
||||
use Friendica\Model\Post;
|
||||
use Friendica\Util\DateTimeFormat;
|
||||
use Friendica\Util\JsonLD;
|
||||
|
@ -140,7 +141,15 @@ class Queue
|
|||
return;
|
||||
}
|
||||
|
||||
Logger::debug('Delete inbox-entry', ['id' => $entry['id']]);
|
||||
|
||||
DBA::delete('inbox-entry', ['id' => $entry['id']]);
|
||||
|
||||
$children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
|
||||
while ($child = DBA::fetch($children)) {
|
||||
self::deleteById($child['id']);
|
||||
}
|
||||
DBA::close($children);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -188,6 +197,11 @@ class Queue
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!self::isProcessable($id)) {
|
||||
Logger::debug('Other queue entries need to be processed first.', ['id' => $id]);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!empty($entry['wid'])) {
|
||||
$worker = DI::app()->getQueue();
|
||||
$wid = $worker['id'] ?? 0;
|
||||
|
@ -210,6 +224,10 @@ class Queue
|
|||
$activity['worker-id'] = $entry['wid'];
|
||||
$activity['recursion-depth'] = 0;
|
||||
|
||||
if (empty($activity['thread-children-type'])) {
|
||||
$activity['thread-children-type'] = $type;
|
||||
}
|
||||
|
||||
$receivers = DBA::select('inbox-entry-receiver', ['uid'], ["`queue-id` = ? AND `uid` != ?", $entry['id'], 0]);
|
||||
while ($receiver = DBA::fetch($receivers)) {
|
||||
if (!in_array($receiver['uid'], $activity['receiver'])) {
|
||||
|
@ -248,6 +266,34 @@ class Queue
|
|||
DBA::close($entries);
|
||||
}
|
||||
|
||||
public static function isProcessable(int $id): bool
|
||||
{
|
||||
$entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
|
||||
if (empty($entry)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!empty($entry['object-id']) && Post::exists(['uri' => $entry['object-id']])) {
|
||||
// The object already exists, so processing can be done
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!empty($entry['conversation'])) {
|
||||
$conv_id = ItemURI::getIdByURI($entry['conversation'], false);
|
||||
if (DBA::exists('post-thread', ['conversation-id' => $conv_id])) {
|
||||
// We have got the conversation in the system, so the post can be processed
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
|
||||
// This entry belongs to some other entry that should be processed first
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear old activities
|
||||
*
|
||||
|
@ -257,7 +303,11 @@ class Queue
|
|||
{
|
||||
// We delete all entries that aren't associated with a worker entry after seven days.
|
||||
// The other entries are deleted when the worker deferred for too long.
|
||||
DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
|
||||
$entries = DBA::select('inbox-entry', ['id'], ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
|
||||
while ($entry = DBA::fetch($entries)) {
|
||||
self::deleteById($entry['id']);
|
||||
}
|
||||
DBA::close($entries);
|
||||
|
||||
// Optimizing this table only last seconds
|
||||
if (DI::config()->get('system', 'optimize_tables')) {
|
||||
|
|
|
@ -586,10 +586,18 @@ class Receiver
|
|||
$object_data['object_activity'] = $activity;
|
||||
}
|
||||
|
||||
if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) {
|
||||
if (($type == 'as:Create') && $trust_source) {
|
||||
if (self::hasArrived($object_data['object_id'])) {
|
||||
Logger::info('The activity already arrived.', ['id' => $object_data['object_id']]);
|
||||
return true;
|
||||
}
|
||||
self::addArrivedId($object_data['object_id']);
|
||||
|
||||
if (Queue::exists($object_data['object_id'], $type)) {
|
||||
Logger::info('The activity is already added.', ['id' => $object_data['object_id']]);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) {
|
||||
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
|
||||
|
@ -601,11 +609,15 @@ class Receiver
|
|||
}
|
||||
|
||||
if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) {
|
||||
if (Queue::isProcessable($object_data['entry-id'])) {
|
||||
// We delay by 5 seconds to allow to accumulate all receivers
|
||||
$delayed = date(DateTimeFormat::MYSQL, time() + 5);
|
||||
Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]);
|
||||
$wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
|
||||
Queue::setWorkerId($object_data['entry-id'], $wid);
|
||||
} else {
|
||||
Logger::debug('Other queue entries need to be processed first.', ['id' => $object_data['entry-id']]);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -679,6 +691,7 @@ class Receiver
|
|||
$object_data['thread-completion'] = Contact::getIdForURL($actor);
|
||||
$object_data['completion-mode'] = self::COMPLETION_ANNOUCE;
|
||||
|
||||
if (!Post::exists(['uri' => $object_data['id'], 'uid' => 0])) {
|
||||
$item = ActivityPub\Processor::createItem($object_data, $fetch_parents);
|
||||
if (empty($item)) {
|
||||
return false;
|
||||
|
@ -686,6 +699,10 @@ class Receiver
|
|||
|
||||
$item['post-reason'] = Item::PR_ANNOUNCEMENT;
|
||||
ActivityPub\Processor::postItem($object_data, $item);
|
||||
} else {
|
||||
Logger::info('Announced id already exists', ['id' => $object_data['id']]);
|
||||
Queue::remove($object_data);
|
||||
}
|
||||
|
||||
if (!empty($activity)) {
|
||||
$announce_object_data = self::processObject($activity);
|
||||
|
@ -1883,4 +1900,29 @@ class Receiver
|
|||
|
||||
return $object_data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an object id to the list of arrived activities
|
||||
*
|
||||
* @param string $id
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private static function addArrivedId(string $id)
|
||||
{
|
||||
DBA::delete('arrived-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
|
||||
DBA::insert('arrived-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given object already arrived before
|
||||
*
|
||||
* @param string $id
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
private static function hasArrived(string $id): bool
|
||||
{
|
||||
return DBA::exists('arrived-activity', ['object-id' => $id]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,6 +315,10 @@ class ParseUrl
|
|||
|
||||
$body = mb_convert_encoding($body, 'HTML-ENTITIES', 'UTF-8');
|
||||
|
||||
if (empty($body)) {
|
||||
return $siteinfo;
|
||||
}
|
||||
|
||||
$doc = new DOMDocument();
|
||||
@$doc->loadHTML($body);
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
use Friendica\Database\DBA;
|
||||
|
||||
if (!defined('DB_UPDATE_VERSION')) {
|
||||
define('DB_UPDATE_VERSION', 1477);
|
||||
define('DB_UPDATE_VERSION', 1478);
|
||||
}
|
||||
|
||||
return [
|
||||
|
@ -1723,6 +1723,28 @@ return [
|
|||
"uri-id_uid" => ["UNIQUE", "uri-id", "uid"],
|
||||
]
|
||||
],
|
||||
"arrived-activity" => [
|
||||
"comment" => "Id of arrived activities",
|
||||
"fields" => [
|
||||
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
|
||||
"received" => ["type" => "datetime", "comment" => "Receiving date"],
|
||||
],
|
||||
"indexes" => [
|
||||
"PRIMARY" => ["object-id"],
|
||||
],
|
||||
"engine" => "MEMORY",
|
||||
],
|
||||
"processed-activity" => [
|
||||
"comment" => "Id of processed activities",
|
||||
"fields" => [
|
||||
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
|
||||
"received" => ["type" => "datetime", "comment" => "Receiving date"],
|
||||
],
|
||||
"indexes" => [
|
||||
"PRIMARY" => ["object-id"],
|
||||
],
|
||||
"engine" => "MEMORY",
|
||||
],
|
||||
"worker-ipc" => [
|
||||
"comment" => "Inter process communication between the frontend and the worker",
|
||||
"fields" => [
|
||||
|
|
Loading…
Reference in a new issue