Merge pull request #11838 from annando/asynchronous-conversation
Conversation entries will now be stored asynchronous if possible
This commit is contained in:
commit
809beeee9d
8 changed files with 119 additions and 55 deletions
10
database.sql
10
database.sql
|
@ -1,6 +1,6 @@
|
|||
-- ------------------------------------------
|
||||
-- Friendica 2022.09-dev (Giant Rhubarb)
|
||||
-- DB_UPDATE_VERSION 1478
|
||||
-- DB_UPDATE_VERSION 1479
|
||||
-- ------------------------------------------
|
||||
|
||||
|
||||
|
@ -1726,13 +1726,13 @@ CREATE TABLE IF NOT EXISTS `arrived-activity` (
|
|||
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities';
|
||||
|
||||
--
|
||||
-- TABLE processed-activity
|
||||
-- TABLE fetched-activity
|
||||
--
|
||||
CREATE TABLE IF NOT EXISTS `processed-activity` (
|
||||
`object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
|
||||
CREATE TABLE IF NOT EXISTS `fetched-activity` (
|
||||
`object-id` varbinary(255) NOT NULL COMMENT 'object id of fetched activity',
|
||||
`received` datetime COMMENT 'Receiving date',
|
||||
PRIMARY KEY(`object-id`)
|
||||
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities';
|
||||
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of fetched activities';
|
||||
|
||||
--
|
||||
-- TABLE worker-ipc
|
||||
|
|
|
@ -26,6 +26,7 @@ Database Tables
|
|||
| [event](help/database/db_event) | Events |
|
||||
| [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation |
|
||||
| [fetch-entry](help/database/db_fetch-entry) | |
|
||||
| [fetched-activity](help/database/db_fetched-activity) | Id of fetched activities |
|
||||
| [fsuggest](help/database/db_fsuggest) | friend suggestion stuff |
|
||||
| [group](help/database/db_group) | privacy groups, group info |
|
||||
| [group_member](help/database/db_group_member) | privacy groups, member info |
|
||||
|
@ -68,7 +69,6 @@ Database Tables
|
|||
| [post-user](help/database/db_post-user) | User specific post data |
|
||||
| [post-user-notification](help/database/db_post-user-notification) | User post notifications |
|
||||
| [process](help/database/db_process) | Currently running system processes |
|
||||
| [processed-activity](help/database/db_processed-activity) | Id of processed activities |
|
||||
| [profile](help/database/db_profile) | user profiles data |
|
||||
| [profile_field](help/database/db_profile_field) | Custom profile fields |
|
||||
| [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers |
|
||||
|
|
22
doc/database/db_fetched-activity.md
Normal file
22
doc/database/db_fetched-activity.md
Normal file
|
@ -0,0 +1,22 @@
|
|||
Table fetched-activity
|
||||
===========
|
||||
|
||||
Id of fetched activities
|
||||
|
||||
Fields
|
||||
------
|
||||
|
||||
| Field | Description | Type | Null | Key | Default | Extra |
|
||||
| --------- | ----------------------------- | -------------- | ---- | --- | ------- | ----- |
|
||||
| object-id | object id of fetched activity | varbinary(255) | NO | PRI | NULL | |
|
||||
| received | Receiving date | datetime | YES | | NULL | |
|
||||
|
||||
Indexes
|
||||
------------
|
||||
|
||||
| Name | Fields |
|
||||
| ------- | --------- |
|
||||
| PRIMARY | object-id |
|
||||
|
||||
|
||||
Return to [database documentation](help/database)
|
|
@ -1,22 +0,0 @@
|
|||
Table processed-activity
|
||||
===========
|
||||
|
||||
Id of processed activities
|
||||
|
||||
Fields
|
||||
------
|
||||
|
||||
| Field | Description | Type | Null | Key | Default | Extra |
|
||||
| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
|
||||
| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
|
||||
| received | Receiving date | datetime | YES | | NULL | |
|
||||
|
||||
Indexes
|
||||
------------
|
||||
|
||||
| Name | Fields |
|
||||
| ------- | --------- |
|
||||
| PRIMARY | object-id |
|
||||
|
||||
|
||||
Return to [database documentation](help/database)
|
|
@ -854,7 +854,7 @@ class Conversation
|
|||
$row['direction'] = ['direction' => 7, 'title' => $this->l10n->t('You had been addressed (%s).', 'bcc')];
|
||||
break;
|
||||
case ItemModel::PR_FOLLOWER:
|
||||
$row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['author-name'])];
|
||||
$row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['causer-name'] ?: $row['author-name'])];
|
||||
break;
|
||||
case ItemModel::PR_TAG:
|
||||
$row['direction'] = ['direction' => 4, 'title' => $this->l10n->t('You subscribed to one or more tags in this post.')];
|
||||
|
|
|
@ -1513,7 +1513,7 @@ class Item
|
|||
|
||||
$is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE);
|
||||
|
||||
if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
|
||||
if (($uid != 0) && (($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
|
||||
DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE &&
|
||||
!in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) {
|
||||
Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]);
|
||||
|
|
|
@ -69,20 +69,20 @@ class Processor
|
|||
*/
|
||||
private static function addActivityId(string $id)
|
||||
{
|
||||
DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
|
||||
DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
|
||||
DBA::delete('fetched-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
|
||||
DBA::insert('fetched-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given object id has just been processed
|
||||
* Checks if the given object id has just been fetched
|
||||
*
|
||||
* @param string $id
|
||||
*
|
||||
* @return boolean
|
||||
*/
|
||||
private static function isProcessed(string $id): bool
|
||||
private static function isFetched(string $id): bool
|
||||
{
|
||||
return DBA::exists('processed-activity', ['object-id' => $id]);
|
||||
return DBA::exists('fetched-activity', ['object-id' => $id]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -303,13 +303,6 @@ class Processor
|
|||
*/
|
||||
public static function createItem(array $activity, bool $fetch_parents): array
|
||||
{
|
||||
if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) {
|
||||
Logger::info('Id is already processed', ['id' => $activity['id']]);
|
||||
return [];
|
||||
}
|
||||
|
||||
self::addActivityId($activity['id']);
|
||||
|
||||
$item = [];
|
||||
$item['verb'] = Activity::POST;
|
||||
$item['thr-parent'] = $activity['reply-to-id'];
|
||||
|
@ -333,6 +326,7 @@ class Processor
|
|||
if (!empty($conversation)) {
|
||||
Logger::debug('Got conversation', ['conversation' => $item['conversation'], 'parent' => $conversation]);
|
||||
$item['parent-uri'] = $conversation['uri'];
|
||||
$item['parent-uri-id'] = ItemURI::getIdByURI($item['parent-uri']);
|
||||
}
|
||||
} else {
|
||||
$conversation = [];
|
||||
|
@ -350,7 +344,7 @@ class Processor
|
|||
}
|
||||
|
||||
if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
|
||||
$result = self::fetchParent($activity);
|
||||
$result = self::fetchParent($activity, !empty($conversation));
|
||||
if (!empty($result)) {
|
||||
if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
|
||||
$item['thr-parent'] = $result;
|
||||
|
@ -457,6 +451,8 @@ class Processor
|
|||
return [];
|
||||
}
|
||||
|
||||
$item['thr-parent-id'] = ItemURI::getIdByURI($item['thr-parent']);
|
||||
|
||||
$item = self::processContent($activity, $item);
|
||||
if (empty($item)) {
|
||||
Logger::info('Message was not processed');
|
||||
|
@ -489,14 +485,26 @@ class Processor
|
|||
* Fetch and process parent posts for the given activity
|
||||
*
|
||||
* @param array $activity
|
||||
* @param bool $in_background
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
private static function fetchParent(array $activity): string
|
||||
private static function fetchParent(array $activity, bool $in_background = false): string
|
||||
{
|
||||
if (self::isFetched($activity['reply-to-id'])) {
|
||||
Logger::info('Id is already fetched', ['id' => $activity['reply-to-id']]);
|
||||
return '';
|
||||
}
|
||||
|
||||
self::addActivityId($activity['reply-to-id']);
|
||||
|
||||
if (!DI::config()->get('system', 'fetch_by_worker')) {
|
||||
$in_background = false;
|
||||
}
|
||||
|
||||
$recursion_depth = $activity['recursion-depth'] ?? 0;
|
||||
|
||||
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
|
||||
if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) {
|
||||
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
||||
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
|
||||
|
@ -525,19 +533,21 @@ class Processor
|
|||
}
|
||||
} elseif (self::isActivityGone($activity['reply-to-id'])) {
|
||||
Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
|
||||
if (!empty($activity['entry-id'])) {
|
||||
if ($in_background) {
|
||||
// fetching in background is done for all activities where we have got the conversation
|
||||
// There we only delete the single activity and not the whole thread since we can store the
|
||||
// other posts in the thread even with missing posts.
|
||||
Queue::remove($activity);
|
||||
} elseif (!empty($activity['entry-id'])) {
|
||||
Queue::deleteById($activity['entry-id']);
|
||||
}
|
||||
return '';
|
||||
} elseif ($in_background) {
|
||||
Logger::notice('Fetching is done in the background.', ['parent' => $activity['reply-to-id']]);
|
||||
} else {
|
||||
Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||
}
|
||||
|
||||
if (Queue::hasWorker($activity['worker-id'] ?? 0)) {
|
||||
Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
|
||||
return '';
|
||||
}
|
||||
|
||||
if (!Fetch::hasWorker($activity['reply-to-id'])) {
|
||||
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||
Fetch::add($activity['reply-to-id']);
|
||||
|
@ -1056,6 +1066,10 @@ class Processor
|
|||
Logger::info('Accepting post', ['uid' => $receiver, 'url' => $item['uri']]);
|
||||
}
|
||||
|
||||
if (!self::hasParents($item, $receiver)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (($item['gravity'] != GRAVITY_ACTIVITY) && ($activity['object_type'] == 'as:Event')) {
|
||||
$event_id = self::createEvent($activity, $item);
|
||||
|
||||
|
@ -1096,6 +1110,56 @@ class Processor
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if there are parent posts for the given receiver.
|
||||
* If not, then the system will try to add them.
|
||||
*
|
||||
* @param array $item
|
||||
* @param integer $receiver
|
||||
* @return boolean
|
||||
*/
|
||||
private static function hasParents(array $item, int $receiver)
|
||||
{
|
||||
if (($receiver == 0) || ($item['gravity'] == GRAVITY_PARENT)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
$fields = ['causer-id' => $item['causer-id'] ?? $item['author-id'], 'post-reason' => Item::PR_FETCHED];
|
||||
|
||||
$has_parents = false;
|
||||
|
||||
if (!empty($item['parent-uri-id'])) {
|
||||
if (Post::exists(['uri-id' => $item['parent-uri-id'], 'uid' => $receiver])) {
|
||||
$has_parents = true;
|
||||
} elseif (Post::exists(['uri-id' => $item['parent-uri'], 'uid' => 0])) {
|
||||
$stored = Item::storeForUserByUriId($item['parent-uri-id'], $receiver, $fields);
|
||||
$has_parents = (bool)$stored;
|
||||
if ($stored) {
|
||||
Logger::notice('Inserted missing parent post', ['stored' => $stored, 'uid' => $receiver, 'parent' => $item['parent-uri']]);
|
||||
} else {
|
||||
Logger::notice('Parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'parent' => $item['parent-uri']]);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (empty($item['parent-uri-id']) || ($item['thr-parent-id'] != $item['parent-uri-id'])) {
|
||||
if (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => $receiver])) {
|
||||
$has_parents = true;
|
||||
} elseif (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => 0])) {
|
||||
$stored = Item::storeForUserByUriId($item['thr-parent-id'], $receiver, $fields);
|
||||
$has_parents = $has_parents || (bool)$stored;
|
||||
if ($stored) {
|
||||
Logger::notice('Inserted missing thread parent post', ['stored' => $stored, 'uid' => $receiver, 'thread-parent' => $item['thr-parent']]);
|
||||
} else {
|
||||
Logger::notice('Thread parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'thread-parent' => $item['thr-parent']]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return $has_parents;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store tags and mentions into the tag table
|
||||
*
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
use Friendica\Database\DBA;
|
||||
|
||||
if (!defined('DB_UPDATE_VERSION')) {
|
||||
define('DB_UPDATE_VERSION', 1478);
|
||||
define('DB_UPDATE_VERSION', 1479);
|
||||
}
|
||||
|
||||
return [
|
||||
|
@ -1734,10 +1734,10 @@ return [
|
|||
],
|
||||
"engine" => "MEMORY",
|
||||
],
|
||||
"processed-activity" => [
|
||||
"comment" => "Id of processed activities",
|
||||
"fetched-activity" => [
|
||||
"comment" => "Id of fetched activities",
|
||||
"fields" => [
|
||||
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
|
||||
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of fetched activity"],
|
||||
"received" => ["type" => "datetime", "comment" => "Receiving date"],
|
||||
],
|
||||
"indexes" => [
|
||||
|
|
Loading…
Reference in a new issue