Add delivery queue number manipulation

- Add new ItemDeliveryData model class
- Add queue_count initialization in Notifier
- Add queue_done incrementation in various Delivery tasks
This commit is contained in:
Hypolite Petovan 2018-12-07 00:52:14 -05:00
parent a3c1beb265
commit 1696ad962e
5 changed files with 179 additions and 68 deletions

View file

@ -50,7 +50,8 @@ class Item extends BaseObject
const PT_PERSONAL_NOTE = 128; const PT_PERSONAL_NOTE = 128;
// Field list that is used to display the items // Field list that is used to display the items
const DISPLAY_FIELDLIST = ['uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid', 'network', const DISPLAY_FIELDLIST = [
'uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid', 'network',
'commented', 'created', 'edited', 'received', 'verb', 'object-type', 'postopts', 'plink', 'commented', 'created', 'edited', 'received', 'verb', 'object-type', 'postopts', 'plink',
'wall', 'private', 'starred', 'origin', 'title', 'body', 'file', 'attach', 'language', 'wall', 'private', 'starred', 'origin', 'title', 'body', 'file', 'attach', 'language',
'content-warning', 'location', 'coord', 'app', 'rendered-hash', 'rendered-html', 'object', 'content-warning', 'location', 'coord', 'app', 'rendered-hash', 'rendered-html', 'object',
@ -61,7 +62,9 @@ class Item extends BaseObject
'writable', 'self', 'cid', 'alias', 'writable', 'self', 'cid', 'alias',
'event-id', 'event-created', 'event-edited', 'event-start', 'event-finish', 'event-id', 'event-created', 'event-edited', 'event-start', 'event-finish',
'event-summary', 'event-desc', 'event-location', 'event-type', 'event-summary', 'event-desc', 'event-location', 'event-type',
'event-nofinish', 'event-adjust', 'event-ignore', 'event-id']; 'event-nofinish', 'event-adjust', 'event-ignore', 'event-id',
'delivery_queue_count', 'delivery_queue_done'
];
// Field list that is used to deliver items via the protocols // Field list that is used to deliver items via the protocols
const DELIVER_FIELDLIST = ['uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid', const DELIVER_FIELDLIST = ['uid', 'id', 'parent', 'uri', 'thr-parent', 'parent-uri', 'guid',
@ -80,9 +83,6 @@ class Item extends BaseObject
// Field list for "item-content" table that is not present in the "item" table // Field list for "item-content" table that is not present in the "item" table
const CONTENT_FIELDLIST = ['language']; const CONTENT_FIELDLIST = ['language'];
// Field list for additional delivery data
const DELIVERY_DATA_FIELDLIST = ['postopts', 'inform'];
// All fields in the item table // All fields in the item table
const ITEM_FIELDLIST = ['id', 'uid', 'parent', 'uri', 'parent-uri', 'thr-parent', 'guid', const ITEM_FIELDLIST = ['id', 'uid', 'parent', 'uri', 'parent-uri', 'thr-parent', 'guid',
'contact-id', 'type', 'wall', 'gravity', 'extid', 'icid', 'iaid', 'psid', 'contact-id', 'type', 'wall', 'gravity', 'extid', 'icid', 'iaid', 'psid',
@ -190,7 +190,7 @@ class Item extends BaseObject
// Fetch data from the item-content table whenever there is content there // Fetch data from the item-content table whenever there is content there
if (self::isLegacyMode()) { if (self::isLegacyMode()) {
$legacy_fields = array_merge(self::DELIVERY_DATA_FIELDLIST, self::MIXED_CONTENT_FIELDLIST); $legacy_fields = array_merge(ItemDeliveryData::FIELD_LIST, self::MIXED_CONTENT_FIELDLIST);
foreach ($legacy_fields as $field) { foreach ($legacy_fields as $field) {
if (empty($row[$field]) && !empty($row['internal-item-' . $field])) { if (empty($row[$field]) && !empty($row['internal-item-' . $field])) {
$row[$field] = $row['internal-item-' . $field]; $row[$field] = $row['internal-item-' . $field];
@ -551,7 +551,7 @@ class Item extends BaseObject
$fields['item-content'] = array_merge(self::CONTENT_FIELDLIST, self::MIXED_CONTENT_FIELDLIST); $fields['item-content'] = array_merge(self::CONTENT_FIELDLIST, self::MIXED_CONTENT_FIELDLIST);
$fields['item-delivery-data'] = self::DELIVERY_DATA_FIELDLIST; $fields['item-delivery-data'] = ItemDeliveryData::FIELD_LIST;
$fields['permissionset'] = ['allow_cid', 'allow_gid', 'deny_cid', 'deny_gid']; $fields['permissionset'] = ['allow_cid', 'allow_gid', 'deny_cid', 'deny_gid'];
@ -730,7 +730,7 @@ class Item extends BaseObject
foreach ($fields as $table => $table_fields) { foreach ($fields as $table => $table_fields) {
foreach ($table_fields as $field => $select) { foreach ($table_fields as $field => $select) {
if (empty($selected) || in_array($select, $selected)) { if (empty($selected) || in_array($select, $selected)) {
$legacy_fields = array_merge(self::DELIVERY_DATA_FIELDLIST, self::MIXED_CONTENT_FIELDLIST); $legacy_fields = array_merge(ItemDeliveryData::FIELD_LIST, self::MIXED_CONTENT_FIELDLIST);
if (self::isLegacyMode() && in_array($select, $legacy_fields)) { if (self::isLegacyMode() && in_array($select, $legacy_fields)) {
$selection[] = "`item`.`".$select."` AS `internal-item-" . $select . "`"; $selection[] = "`item`.`".$select."` AS `internal-item-" . $select . "`";
} }
@ -810,7 +810,9 @@ class Item extends BaseObject
} }
} }
$clear_fields = ['bookmark', 'type', 'author-name', 'author-avatar', 'author-link', 'owner-name', 'owner-avatar', 'owner-link']; $delivery_data = ItemDeliveryData::extractFields($fields);
$clear_fields = ['bookmark', 'type', 'author-name', 'author-avatar', 'author-link', 'owner-name', 'owner-avatar', 'owner-link', 'postopts', 'inform'];
foreach ($clear_fields as $field) { foreach ($clear_fields as $field) {
if (array_key_exists($field, $fields)) { if (array_key_exists($field, $fields)) {
$fields[$field] = null; $fields[$field] = null;
@ -831,12 +833,6 @@ class Item extends BaseObject
$files = null; $files = null;
} }
$delivery_data = ['postopts' => defaults($fields, 'postopts', ''),
'inform' => defaults($fields, 'inform', '')];
$fields['postopts'] = null;
$fields['inform'] = null;
if (!empty($fields)) { if (!empty($fields)) {
$success = DBA::update('item', $fields, $condition); $success = DBA::update('item', $fields, $condition);
@ -914,7 +910,7 @@ class Item extends BaseObject
} }
} }
self::updateDeliveryData($item['id'], $delivery_data); ItemDeliveryData::update($item['id'], $delivery_data);
self::updateThread($item['id']); self::updateThread($item['id']);
@ -1061,7 +1057,7 @@ class Item extends BaseObject
self::delete(['uri' => $item['uri'], 'uid' => 0, 'deleted' => false], $priority); self::delete(['uri' => $item['uri'], 'uid' => 0, 'deleted' => false], $priority);
} }
DBA::delete('item-delivery-data', ['iid' => $item['id']]); ItemDeliveryData::delete($item['id']);
// We don't delete the item-activity here, since we need some of the data for ActivityPub // We don't delete the item-activity here, since we need some of the data for ActivityPub
@ -1238,6 +1234,8 @@ class Item extends BaseObject
public static function insert($item, $force_parent = false, $notify = false, $dontcache = false) public static function insert($item, $force_parent = false, $notify = false, $dontcache = false)
{ {
$orig_item = $item;
// If it is a posting where users should get notifications, then define it as wall posting // If it is a posting where users should get notifications, then define it as wall posting
if ($notify) { if ($notify) {
$item['wall'] = 1; $item['wall'] = 1;
@ -1661,8 +1659,7 @@ class Item extends BaseObject
self::insertContent($item); self::insertContent($item);
} }
$delivery_data = ['postopts' => defaults($item, 'postopts', ''), $delivery_data = ItemDeliveryData::extractFields($item);
'inform' => defaults($item, 'inform', '')];
unset($item['postopts']); unset($item['postopts']);
unset($item['inform']); unset($item['inform']);
@ -1701,10 +1698,7 @@ class Item extends BaseObject
if ($spoolpath != "") { if ($spoolpath != "") {
$spool = $spoolpath.'/'.$file; $spool = $spoolpath.'/'.$file;
// Ensure to have the removed data from above again in the item array file_put_contents($spool, json_encode($orig_item));
$item = array_merge($item, $delivery_data);
file_put_contents($spool, json_encode($item));
Logger::log("Item wasn't stored - Item was spooled into file ".$file, Logger::DEBUG); Logger::log("Item wasn't stored - Item was spooled into file ".$file, Logger::DEBUG);
} }
return 0; return 0;
@ -1805,9 +1799,7 @@ class Item extends BaseObject
self::updateThread($parent_id); self::updateThread($parent_id);
} }
$delivery_data['iid'] = $current_post; ItemDeliveryData::insert($item['id'], $delivery_data);
self::insertDeliveryData($delivery_data);
DBA::commit(); DBA::commit();
@ -1848,35 +1840,6 @@ class Item extends BaseObject
return $current_post; return $current_post;
} }
/**
* @brief Insert a new item delivery data entry
*
* @param array $item The item fields that are to be inserted
*/
private static function insertDeliveryData($delivery_data)
{
if (empty($delivery_data['iid']) || (empty($delivery_data['postopts']) && empty($delivery_data['inform']))) {
return;
}
DBA::insert('item-delivery-data', $delivery_data);
}
/**
* @brief Update an existing item delivery data entry
*
* @param integer $id The item id that is to be updated
* @param array $item The item fields that are to be inserted
*/
private static function updateDeliveryData($id, $delivery_data)
{
if (empty($id) || (empty($delivery_data['postopts']) && empty($delivery_data['inform']))) {
return;
}
DBA::update('item-delivery-data', $delivery_data, ['iid' => $id], true);
}
/** /**
* @brief Insert a new item content entry * @brief Insert a new item content entry
* *

View file

@ -0,0 +1,114 @@
<?php
/**
* @file src/Model/ItemDeliveryData.php
*/
namespace Friendica\Model;
use Friendica\Database\DBA;
class ItemDeliveryData
{
const FIELD_LIST = [
// Legacy fields moved from item table
'postopts',
'inform',
// New delivery fields with virtual field name in item fields
'queue_count' => 'delivery_queue_count',
'queue_done' => 'delivery_queue_done',
];
/**
* Extract delivery data from the provided item fields
*
* @param array $fields
* @return array
*/
public static function extractFields(array &$fields)
{
$delivery_data = [];
foreach (ItemDeliveryData::FIELD_LIST as $key => $field) {
if (is_int($key) && isset($fields[$field])) {
// Legacy field moved from item table
$delivery_data[$field] = $fields[$field];
$fields[$field] = null;
} elseif (isset($fields[$field])) {
// New delivery field with virtual field name in item fields
$delivery_data[$key] = $fields[$field];
unset($fields[$field]);
}
}
return $delivery_data;
}
/**
* Increments the queue_done for the given item ID.
*
* Avoids racing condition between multiple delivery threads.
*
* @param integer $item_id
* @return bool
*/
public static function incrementQueueDone($item_id)
{
return DBA::e('UPDATE `item-delivery-data` SET `queue_done` = `queue_done` + 1 WHERE `iid` = ?', $item_id);
}
/**
* Insert a new item delivery data entry
*
* @param integer $item_id
* @param array $fields
* @return bool
*/
public static function insert($item_id, array $fields)
{
if (empty($item_id)) {
throw new \BadMethodCallException('Empty item_id');
}
$fields['iid'] = $item_id;
return DBA::insert('item-delivery-data', $fields);
}
/**
* Update/Insert item delivery data
*
* If you want to update queue_done, please use incrementQueueDone instead.
*
* @param integer $item_id
* @param array $fields
* @return bool
*/
public static function update($item_id, array $fields)
{
if (empty($item_id)) {
throw new \BadMethodCallException('Empty item_id');
}
if (empty($fields)) {
// Nothing to do, update successful
return true;
}
return DBA::update('item-delivery-data', $fields, ['iid' => $item_id], true);
}
/**
* Delete item delivery data
*
* @param integer $item_id
* @return bool
*/
public static function delete($item_id)
{
if (empty($item_id)) {
throw new \BadMethodCallException('Empty item_id');
}
return DBA::delete('item-delivery-data', ['iid' => $item_id]);
}
}

View file

@ -7,6 +7,7 @@ namespace Friendica\Worker;
use Friendica\BaseObject; use Friendica\BaseObject;
use Friendica\Core\Logger; use Friendica\Core\Logger;
use Friendica\Core\Worker; use Friendica\Core\Worker;
use Friendica\Model\ItemDeliveryData;
use Friendica\Protocol\ActivityPub; use Friendica\Protocol\ActivityPub;
use Friendica\Model\Item; use Friendica\Model\Item;
use Friendica\Util\HTTPSignature; use Friendica\Util\HTTPSignature;
@ -40,6 +41,10 @@ class APDelivery extends BaseObject
if (!empty($data)) { if (!empty($data)) {
$success = HTTPSignature::transmit($data, $inbox, $uid); $success = HTTPSignature::transmit($data, $inbox, $uid);
} }
if ($success && in_array($cmd, [Delivery::POST, Delivery::COMMENT])) {
ItemDeliveryData::incrementQueueDone($target_id);
}
} }
if (!$success) { if (!$success) {

View file

@ -13,6 +13,7 @@ use Friendica\Core\System;
use Friendica\Database\DBA; use Friendica\Database\DBA;
use Friendica\Model\Contact; use Friendica\Model\Contact;
use Friendica\Model\Item; use Friendica\Model\Item;
use Friendica\Model\ItemDeliveryData;
use Friendica\Model\Queue; use Friendica\Model\Queue;
use Friendica\Model\User; use Friendica\Model\User;
use Friendica\Protocol\DFRN; use Friendica\Protocol\DFRN;
@ -180,10 +181,18 @@ class Delivery extends BaseObject
case Protocol::DFRN: case Protocol::DFRN:
self::deliverDFRN($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); self::deliverDFRN($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup);
if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) {
ItemDeliveryData::incrementQueueDone($target_id);
}
break; break;
case Protocol::DIASPORA: case Protocol::DIASPORA:
self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup); self::deliverDiaspora($cmd, $contact, $owner, $items, $target_item, $public_message, $top_level, $followup);
if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) {
ItemDeliveryData::incrementQueueDone($target_id);
}
break; break;
case Protocol::OSTATUS: case Protocol::OSTATUS:
@ -201,6 +210,10 @@ class Delivery extends BaseObject
case Protocol::MAIL: case Protocol::MAIL:
self::deliverMail($cmd, $contact, $owner, $target_item); self::deliverMail($cmd, $contact, $owner, $target_item);
if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) {
ItemDeliveryData::incrementQueueDone($target_id);
}
break; break;
default: default:

View file

@ -15,6 +15,7 @@ use Friendica\Model\Contact;
use Friendica\Model\Conversation; use Friendica\Model\Conversation;
use Friendica\Model\Group; use Friendica\Model\Group;
use Friendica\Model\Item; use Friendica\Model\Item;
use Friendica\Model\ItemDeliveryData;
use Friendica\Model\PushSubscriber; use Friendica\Model\PushSubscriber;
use Friendica\Model\User; use Friendica\Model\User;
use Friendica\Network\Probe; use Friendica\Network\Probe;
@ -65,6 +66,7 @@ class Notifier
$delivery_contacts_stmt = null; $delivery_contacts_stmt = null;
$target_item = []; $target_item = [];
$items = []; $items = [];
$delivery_queue_count = 0;
if ($cmd == Delivery::MAIL) { if ($cmd == Delivery::MAIL) {
$message = DBA::selectFirst('mail', ['uid', 'contact-id'], ['id' => $target_id]); $message = DBA::selectFirst('mail', ['uid', 'contact-id'], ['id' => $target_id]);
@ -150,7 +152,7 @@ class Notifier
if (!empty($target_item) && !empty($items)) { if (!empty($target_item) && !empty($items)) {
$parent = $items[0]; $parent = $items[0];
self::activityPubDelivery($cmd, $target_item, $parent, $a->queue['priority'], $a->query_string['created']); $delivery_queue_count += self::activityPubDelivery($cmd, $target_item, $parent, $a->queue['priority'], $a->query_string['created']);
$fields = ['network', 'author-id', 'owner-id']; $fields = ['network', 'author-id', 'owner-id'];
$condition = ['uri' => $target_item["thr-parent"], 'uid' => $target_item["uid"]]; $condition = ['uri' => $target_item["thr-parent"], 'uid' => $target_item["uid"]];
@ -423,6 +425,9 @@ class Notifier
if (DBA::isResult($r)) { if (DBA::isResult($r)) {
foreach ($r as $rr) { foreach ($r as $rr) {
$conversants[] = $rr['id']; $conversants[] = $rr['id'];
$delivery_queue_count++;
Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG); Logger::log('Public delivery of item ' . $target_item["guid"] . ' (' . $target_id . ') to ' . json_encode($rr), Logger::DEBUG);
// Ensure that posts with our own protocol arrives before Diaspora posts arrive. // Ensure that posts with our own protocol arrives before Diaspora posts arrive.
@ -454,6 +459,8 @@ class Notifier
continue; continue;
} }
$delivery_queue_count++;
Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG); Logger::log('Delivery of item ' . $target_id . ' to ' . json_encode($contact), Logger::DEBUG);
// Ensure that posts with our own protocol arrives before Diaspora posts arrive. // Ensure that posts with our own protocol arrives before Diaspora posts arrive.
@ -473,11 +480,13 @@ class Notifier
// send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts // send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts
// They are especially used for notifications to OStatus users that don't follow us. // They are especially used for notifications to OStatus users that don't follow us.
if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) { if (!Config::get('system', 'dfrn_only') && count($url_recipients) && ($public_message || $push_notify) && !empty($target_item)) {
$delivery_queue_count += count($url_recipients);
$slap = OStatus::salmon($target_item, $owner); $slap = OStatus::salmon($target_item, $owner);
foreach ($url_recipients as $url) { foreach ($url_recipients as $url) {
Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url); Logger::log('Salmon delivery of item ' . $target_id . ' to ' . $url);
/// @TODO Redeliver/queue these items on failure, though there is no contact record /// @TODO Redeliver/queue these items on failure, though there is no contact record
Salmon::slapper($owner, $url, $slap); Salmon::slapper($owner, $url, $slap);
ItemDeliveryData::incrementQueueDone($target_id);
} }
} }
@ -492,6 +501,10 @@ class Notifier
if (!empty($target_item)) { if (!empty($target_item)) {
Logger::log('Calling hooks for ' . $cmd . ' ' . $target_id, Logger::DEBUG); Logger::log('Calling hooks for ' . $cmd . ' ' . $target_id, Logger::DEBUG);
if (in_array($cmd, [Delivery::POST, Delivery::COMMENT])) {
ItemDeliveryData::update($target_item['id'], ['queue_count' => $delivery_queue_count]);
}
Hook::fork($a->queue['priority'], 'notifier_normal', $target_item); Hook::fork($a->queue['priority'], 'notifier_normal', $target_item);
Hook::callAll('notifier_end', $target_item); Hook::callAll('notifier_end', $target_item);
@ -539,6 +552,7 @@ class Notifier
* @param array $parent * @param array $parent
* @param int $priority The priority the Notifier queue item was created with * @param int $priority The priority the Notifier queue item was created with
* @param string $created The date the Notifier queue item was created on * @param string $created The date the Notifier queue item was created on
* @return int The number of delivery tasks created
*/ */
private static function activityPubDelivery($cmd, array $target_item, array $parent, $priority, $created) private static function activityPubDelivery($cmd, array $target_item, array $parent, $priority, $created)
{ {
@ -549,7 +563,7 @@ class Notifier
Logger::log('Origin item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' will be distributed.', Logger::DEBUG); Logger::log('Origin item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' will be distributed.', Logger::DEBUG);
} elseif (!DBA::exists('conversation', ['item-uri' => $target_item['uri'], 'protocol' => Conversation::PARCEL_ACTIVITYPUB])) { } elseif (!DBA::exists('conversation', ['item-uri' => $target_item['uri'], 'protocol' => Conversation::PARCEL_ACTIVITYPUB])) {
Logger::log('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' is no AP post. It will not be distributed.', Logger::DEBUG); Logger::log('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' is no AP post. It will not be distributed.', Logger::DEBUG);
return; return 0;
} elseif ($parent['origin']) { } elseif ($parent['origin']) {
// Remote items are transmitted via the personal inboxes. // Remote items are transmitted via the personal inboxes.
// Doing so ensures that the dedicated receiver will get the message. // Doing so ensures that the dedicated receiver will get the message.
@ -559,7 +573,7 @@ class Notifier
if (empty($inboxes)) { if (empty($inboxes)) {
Logger::log('No inboxes found for item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . '. It will not be distributed.', Logger::DEBUG); Logger::log('No inboxes found for item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . '. It will not be distributed.', Logger::DEBUG);
return; return 0;
} }
// Fill the item cache // Fill the item cache
@ -571,6 +585,8 @@ class Notifier
Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true],
'APDelivery', $cmd, $target_item['id'], $inbox, $target_item['contact-uid']); 'APDelivery', $cmd, $target_item['id'], $inbox, $target_item['contact-uid']);
} }
return count($inboxes);
} }
private static function isForumPost(array $item, array $owner) private static function isForumPost(array $item, array $owner)