Improved bulk delivery / don't redistribute activities
This commit is contained in:
parent
da1c13368b
commit
842e51194e
4 changed files with 27 additions and 9 deletions
|
@ -159,8 +159,13 @@ class Cron
|
||||||
*/
|
*/
|
||||||
private static function deliverPosts()
|
private static function deliverPosts()
|
||||||
{
|
{
|
||||||
$deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`");
|
$deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox` ORDER BY RAND()");
|
||||||
while ($delivery = DBA::fetch($deliveries)) {
|
while ($delivery = DBA::fetch($deliveries)) {
|
||||||
|
if ($delivery['failed'] > 0) {
|
||||||
|
Logger::info('Removing failed deliveries', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed']]);
|
||||||
|
Post\Delivery::removeFailed($delivery['inbox']);
|
||||||
|
}
|
||||||
|
|
||||||
if ($delivery['failed'] == 0) {
|
if ($delivery['failed'] == 0) {
|
||||||
$result = ActivityPub\Delivery::deliver($delivery['inbox']);
|
$result = ActivityPub\Delivery::deliver($delivery['inbox']);
|
||||||
Logger::info('Directly deliver inbox', ['inbox' => $delivery['inbox'], 'result' => $result['success']]);
|
Logger::info('Directly deliver inbox', ['inbox' => $delivery['inbox'], 'result' => $result['success']]);
|
||||||
|
@ -175,15 +180,17 @@ class Cron
|
||||||
$priority = Worker::PRIORITY_NEGLIGIBLE;
|
$priority = Worker::PRIORITY_NEGLIGIBLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($delivery['failed'] >= DI::config()->get('system', 'worker_defer_limit')) {
|
if (Worker::add(['priority' => $priority, 'force_priority' => true], 'APDelivery', '', 0, $delivery['inbox'], 0)) {
|
||||||
Logger::info('Removing failed deliveries', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed']]);
|
|
||||||
Post\Delivery::removeFailed($delivery['inbox']);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Worker::add($priority, 'APDelivery', '', 0, $delivery['inbox'], 0)) {
|
|
||||||
Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
|
Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Optimizing this table only last seconds
|
||||||
|
if (DI::config()->get('system', 'optimize_tables')) {
|
||||||
|
Logger::info('Optimize start');
|
||||||
|
DBA::e("OPTIMIZE TABLE `post-delivery`");
|
||||||
|
Logger::info('Optimize end');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,6 +27,7 @@ use Friendica\Database\DBA;
|
||||||
use Friendica\DI;
|
use Friendica\DI;
|
||||||
use Friendica\Model\Contact;
|
use Friendica\Model\Contact;
|
||||||
use Friendica\Model\GServer;
|
use Friendica\Model\GServer;
|
||||||
|
use Friendica\Model\Item;
|
||||||
use Friendica\Model\Post;
|
use Friendica\Model\Post;
|
||||||
use Friendica\Protocol\ActivityPub;
|
use Friendica\Protocol\ActivityPub;
|
||||||
use Friendica\Util\HTTPSignature;
|
use Friendica\Util\HTTPSignature;
|
||||||
|
@ -82,11 +83,15 @@ class Delivery
|
||||||
public static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array
|
public static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array
|
||||||
{
|
{
|
||||||
if (empty($item_id) && !empty($uri_id) && !empty($uid)) {
|
if (empty($item_id) && !empty($uri_id) && !empty($uid)) {
|
||||||
$item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]);
|
$item = Post::selectFirst(['id', 'parent', 'origin', 'gravity', 'verb'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]);
|
||||||
if (empty($item['id'])) {
|
if (empty($item['id'])) {
|
||||||
Logger::warning('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
|
Logger::warning('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
|
||||||
Post\Delivery::remove($uri_id, $inbox);
|
Post\Delivery::remove($uri_id, $inbox);
|
||||||
return ['success' => true, 'serverfailure' => false, 'drop' => false];
|
return ['success' => true, 'serverfailure' => false, 'drop' => false];
|
||||||
|
} elseif (!DI::config()->get('system', 'redistribute_activities') && !$item['origin'] && ($item['gravity'] == Item::GRAVITY_ACTIVITY)) {
|
||||||
|
Logger::notice('Activities are not relayed, removing delivery', ['verb' => $item['verb'], 'uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]);
|
||||||
|
Post\Delivery::remove($uri_id, $inbox);
|
||||||
|
return ['success' => true, 'serverfailure' => false, 'drop' => false];
|
||||||
} else {
|
} else {
|
||||||
$item_id = $item['id'];
|
$item_id = $item['id'];
|
||||||
}
|
}
|
||||||
|
@ -162,6 +167,8 @@ class Delivery
|
||||||
Post\Delivery::incrementFailed($uri_id, $inbox);
|
Post\Delivery::incrementFailed($uri_id, $inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} elseif ($uri_id) {
|
||||||
|
Post\Delivery::remove($uri_id, $inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -775,7 +775,7 @@ class Notifier
|
||||||
} elseif (!Post\Activity::exists($target_item['uri-id'])) {
|
} elseif (!Post\Activity::exists($target_item['uri-id'])) {
|
||||||
Logger::info('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' is no AP post. It will not be distributed.');
|
Logger::info('Remote item ' . $target_item['id'] . ' with URL ' . $target_item['uri'] . ' is no AP post. It will not be distributed.');
|
||||||
return ['count' => 0, 'contacts' => []];
|
return ['count' => 0, 'contacts' => []];
|
||||||
} elseif ($parent['origin']) {
|
} elseif ($parent['origin'] && (($target_item['gravity'] != Item::GRAVITY_ACTIVITY) || DI::config()->get('system', 'redistribute_activities'))) {
|
||||||
$inboxes = ActivityPub\Transmitter::fetchTargetInboxes($parent, $uid, false, $target_item['id']);
|
$inboxes = ActivityPub\Transmitter::fetchTargetInboxes($parent, $uid, false, $target_item['id']);
|
||||||
|
|
||||||
if (in_array($target_item['private'], [Item::PUBLIC])) {
|
if (in_array($target_item['private'], [Item::PUBLIC])) {
|
||||||
|
|
|
@ -525,6 +525,10 @@ return [
|
||||||
// The authentication password for the redis database
|
// The authentication password for the redis database
|
||||||
'redis_password' => null,
|
'redis_password' => null,
|
||||||
|
|
||||||
|
// redistribute_activities (Boolean)
|
||||||
|
// Redistribute incoming activities via ActivityPub
|
||||||
|
'redistribute_activities' => true,
|
||||||
|
|
||||||
// relay_deny_languages (Array)
|
// relay_deny_languages (Array)
|
||||||
// Array of languages (two digit format) that are rejected.
|
// Array of languages (two digit format) that are rejected.
|
||||||
'relay_deny_languages' => [],
|
'relay_deny_languages' => [],
|
||||||
|
|
Loading…
Reference in a new issue