Merge pull request #9661 from MrPetovan/bug/9651-apdelivery-inboxes
Clarify usage of ActivityPub\Transmitter::fetchTargetInboxes in Worker\APDelivery queuing
This commit is contained in:
commit
569ad89f10
5 changed files with 185 additions and 43 deletions
163
src/Console/FixAPDeliveryWorkerTaskParameters.php
Normal file
163
src/Console/FixAPDeliveryWorkerTaskParameters.php
Normal file
|
@ -0,0 +1,163 @@
|
||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* @copyright Copyright (C) 2020, Friendica
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Friendica\Console;
|
||||||
|
|
||||||
|
use Friendica\App;
|
||||||
|
use Friendica\Database\Database;
|
||||||
|
use Friendica\Database\DBA;
|
||||||
|
use Friendica\DI;
|
||||||
|
use Friendica\Model\Contact;
|
||||||
|
use Friendica\Util\Strings;
|
||||||
|
use RuntimeException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* License: AGPLv3 or later, same as Friendica
|
||||||
|
*/
|
||||||
|
class FixAPDeliveryWorkerTaskParameters extends \Asika\SimpleConsole\Console
|
||||||
|
{
|
||||||
|
protected $helpOptions = ['h', 'help', '?'];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var App\Mode
|
||||||
|
*/
|
||||||
|
private $appMode;
|
||||||
|
/**
|
||||||
|
* @var Database
|
||||||
|
*/
|
||||||
|
private $dba;
|
||||||
|
/**
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
private $examined;
|
||||||
|
/**
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
private $processed;
|
||||||
|
/**
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
private $errored;
|
||||||
|
|
||||||
|
protected function getHelp()
|
||||||
|
{
|
||||||
|
$help = <<<HELP
|
||||||
|
console fixapdeliveryworkertaskparameters - fix APDelivery worker task parameters corrupted during the 2020.12 RC period
|
||||||
|
Usage
|
||||||
|
bin/console fixapdeliveryworkertaskparameters [-h|--help|-?] [-v]
|
||||||
|
|
||||||
|
Description
|
||||||
|
During the 2020.12 RC period some worker task parameters have been corrupted, resulting in the impossibility to execute them.
|
||||||
|
This command restores their expected parameters.
|
||||||
|
If you didn't run Friendica during the 2020.12 RC period, you do not need to use this command.
|
||||||
|
|
||||||
|
Options
|
||||||
|
-h|--help|-? Show help information
|
||||||
|
-v Show more debug information.
|
||||||
|
HELP;
|
||||||
|
return $help;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function __construct(App\Mode $appMode, Database $dba, \Friendica\Core\L10n $l10n, array $argv = null)
|
||||||
|
{
|
||||||
|
parent::__construct($argv);
|
||||||
|
|
||||||
|
$this->appMode = $appMode;
|
||||||
|
$this->dba = $dba;
|
||||||
|
$this->l10n = $l10n;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function doExecute()
|
||||||
|
{
|
||||||
|
if ($this->getOption('v')) {
|
||||||
|
$this->out('Class: ' . __CLASS__);
|
||||||
|
$this->out('Arguments: ' . var_export($this->args, true));
|
||||||
|
$this->out('Options: ' . var_export($this->options, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (count($this->args) > 0) {
|
||||||
|
throw new \Asika\SimpleConsole\CommandArgsException('Too many arguments');
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->appMode->isInstall()) {
|
||||||
|
throw new RuntimeException('Friendica isn\'t properly installed yet.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->examined = 0;
|
||||||
|
$this->processed = 0;
|
||||||
|
$this->errored = 0;
|
||||||
|
|
||||||
|
do {
|
||||||
|
$result = $this->dba->p('SELECT `id`, `parameter` FROM `workerqueue` WHERE `command` = "APDelivery" AND `parameter` LIKE "[\"%\",\"\",%" LIMIT ' . $this->examined . ', 100');
|
||||||
|
while ($row = $this->dba->fetch($result)) {
|
||||||
|
$this->examined++;
|
||||||
|
$this->processRow($row);
|
||||||
|
}
|
||||||
|
} while ($this->dba->isResult($result));
|
||||||
|
|
||||||
|
if ($this->getOption('v')) {
|
||||||
|
$this->out('Examined: ' . $this->examined);
|
||||||
|
$this->out('Processed: ' . $this->processed);
|
||||||
|
$this->out('Errored: ' . $this->errored);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function processRow(array $workerqueueItem)
|
||||||
|
{
|
||||||
|
$parameters = json_decode($workerqueueItem['parameter'], true);
|
||||||
|
|
||||||
|
if (!$parameters) {
|
||||||
|
$this->errored++;
|
||||||
|
if ($this->getOption('v')) {
|
||||||
|
$this->out('Unabled to parse parameter JSON of the row with id ' . $workerqueueItem['id']);
|
||||||
|
$this->out('JSON: ' . var_export($workerqueueItem['parameter'], true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($parameters[1] !== '' && !is_array($parameters[2])) {
|
||||||
|
// Nothing to do, we save a write
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($parameters[1] === '') {
|
||||||
|
$parameters[1] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_array($parameters[2])) {
|
||||||
|
$parameters[4] = $parameters[2];
|
||||||
|
$contact = Contact::getById(current($parameters[2]), ['url']);
|
||||||
|
$parameters[2] = $contact['url'];
|
||||||
|
}
|
||||||
|
|
||||||
|
$fields = ['parameter' => json_encode($parameters)];
|
||||||
|
if ($this->dba->update('workerqueue', $fields, ['id' => $workerqueueItem['id']])) {
|
||||||
|
$this->processed++;
|
||||||
|
} else {
|
||||||
|
$this->errored++;
|
||||||
|
if ($this->getOption('v')) {
|
||||||
|
$this->out('Unabled to update the row with id ' . $workerqueueItem['id']);
|
||||||
|
$this->out('Fields: ' . var_export($fields, true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -94,6 +94,7 @@ HELP;
|
||||||
'serverblock' => Friendica\Console\ServerBlock::class,
|
'serverblock' => Friendica\Console\ServerBlock::class,
|
||||||
'storage' => Friendica\Console\Storage::class,
|
'storage' => Friendica\Console\Storage::class,
|
||||||
'relay' => Friendica\Console\Relay::class,
|
'relay' => Friendica\Console\Relay::class,
|
||||||
|
'fixapdeliveryworkertaskparameters' => Friendica\Console\FixAPDeliveryWorkerTaskParameters::class,
|
||||||
];
|
];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -34,58 +34,36 @@ class APDelivery
|
||||||
/**
|
/**
|
||||||
* Delivers ActivityPub messages
|
* Delivers ActivityPub messages
|
||||||
*
|
*
|
||||||
* @param string $cmd
|
* @param string $cmd One of the Worker\Delivery constant values
|
||||||
* @param integer $target_id
|
* @param integer $item_id 0 if no item is involved (like Delivery::REMOVAL and Delivery::PROFILEUPDATE)
|
||||||
* @param string|array $inboxes
|
* @param string $inbox The URL of the recipient profile
|
||||||
* @param integer $uid
|
* @param integer $uid The ID of the user who triggered this delivery
|
||||||
* @param array $receivers
|
* @param array $receivers The contact IDs related to the inbox URL for contact archival housekeeping
|
||||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
||||||
* @throws \ImagickException
|
* @throws \ImagickException
|
||||||
*/
|
*/
|
||||||
public static function execute(string $cmd, int $target_id, $inboxes, int $uid, array $receivers = [])
|
public static function execute(string $cmd, int $item_id, string $inbox, int $uid, array $receivers = [])
|
||||||
{
|
|
||||||
if (is_string($inboxes)) {
|
|
||||||
$inboxes = [$inboxes];
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($inboxes as $inbox) {
|
|
||||||
self::perform($cmd, $target_id, $inbox, $uid, $receivers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delivers ActivityPub messages
|
|
||||||
*
|
|
||||||
* @param string $cmd
|
|
||||||
* @param integer $target_id
|
|
||||||
* @param string $inbox
|
|
||||||
* @param integer $uid
|
|
||||||
* @param array $receivers
|
|
||||||
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
|
|
||||||
* @throws \ImagickException
|
|
||||||
*/
|
|
||||||
private static function perform(string $cmd, int $target_id, string $inbox, int $uid, array $receivers = [])
|
|
||||||
{
|
{
|
||||||
if (ActivityPub\Transmitter::archivedInbox($inbox)) {
|
if (ActivityPub\Transmitter::archivedInbox($inbox)) {
|
||||||
Logger::info('Inbox is archived', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $target_id, 'uid' => $uid]);
|
Logger::info('Inbox is archived', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uid' => $uid]);
|
||||||
if (in_array($cmd, [Delivery::POST])) {
|
if (in_array($cmd, [Delivery::POST])) {
|
||||||
$item = Item::selectFirst(['uri-id'], ['id' => $target_id]);
|
$item = Item::selectFirst(['uri-id'], ['id' => $item_id]);
|
||||||
Post\DeliveryData::incrementQueueFailed($item['uri-id'] ?? 0);
|
Post\DeliveryData::incrementQueueFailed($item['uri-id'] ?? 0);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger::info('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $target_id, 'uid' => $uid]);
|
Logger::info('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uid' => $uid]);
|
||||||
|
|
||||||
$success = true;
|
$success = true;
|
||||||
|
|
||||||
if ($cmd == Delivery::MAIL) {
|
if ($cmd == Delivery::MAIL) {
|
||||||
$data = ActivityPub\Transmitter::createActivityFromMail($target_id);
|
$data = ActivityPub\Transmitter::createActivityFromMail($item_id);
|
||||||
if (!empty($data)) {
|
if (!empty($data)) {
|
||||||
$success = HTTPSignature::transmit($data, $inbox, $uid);
|
$success = HTTPSignature::transmit($data, $inbox, $uid);
|
||||||
}
|
}
|
||||||
} elseif ($cmd == Delivery::SUGGESTION) {
|
} elseif ($cmd == Delivery::SUGGESTION) {
|
||||||
$success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $target_id);
|
$success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id);
|
||||||
} elseif ($cmd == Delivery::RELOCATION) {
|
} elseif ($cmd == Delivery::RELOCATION) {
|
||||||
// @todo Implementation pending
|
// @todo Implementation pending
|
||||||
} elseif ($cmd == Delivery::POKE) {
|
} elseif ($cmd == Delivery::POKE) {
|
||||||
|
@ -95,14 +73,14 @@ class APDelivery
|
||||||
} elseif ($cmd == Delivery::PROFILEUPDATE) {
|
} elseif ($cmd == Delivery::PROFILEUPDATE) {
|
||||||
$success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox);
|
$success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox);
|
||||||
} else {
|
} else {
|
||||||
$data = ActivityPub\Transmitter::createCachedActivityFromItem($target_id);
|
$data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id);
|
||||||
if (!empty($data)) {
|
if (!empty($data)) {
|
||||||
$success = HTTPSignature::transmit($data, $inbox, $uid);
|
$success = HTTPSignature::transmit($data, $inbox, $uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This should never fail and is temporariy (until the move to the "post" structure)
|
// This should never fail and is temporariy (until the move to the "post" structure)
|
||||||
$item = Item::selectFirst(['uri-id'], ['id' => $target_id]);
|
$item = Item::selectFirst(['uri-id'], ['id' => $item_id]);
|
||||||
$uriid = $item['uri-id'] ?? 0;
|
$uriid = $item['uri-id'] ?? 0;
|
||||||
|
|
||||||
foreach ($receivers as $receiver) {
|
foreach ($receivers as $receiver) {
|
||||||
|
|
|
@ -81,10 +81,10 @@ class Notifier
|
||||||
|
|
||||||
$mail = ActivityPub\Transmitter::ItemArrayFromMail($target_id);
|
$mail = ActivityPub\Transmitter::ItemArrayFromMail($target_id);
|
||||||
$inboxes = ActivityPub\Transmitter::fetchTargetInboxes($mail, $uid, true);
|
$inboxes = ActivityPub\Transmitter::fetchTargetInboxes($mail, $uid, true);
|
||||||
foreach ($inboxes as $inbox) {
|
foreach ($inboxes as $inbox => $receivers) {
|
||||||
Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'target' => $target_id, 'inbox' => $inbox]);
|
Logger::info('Delivery via ActivityPub', ['cmd' => $cmd, 'target' => $target_id, 'inbox' => $inbox]);
|
||||||
Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true],
|
Worker::add(['priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true],
|
||||||
'APDelivery', $cmd, $target_id, $inbox, $uid);
|
'APDelivery', $cmd, $target_id, $inbox, $uid, $receivers);
|
||||||
}
|
}
|
||||||
} elseif ($cmd == Delivery::SUGGESTION) {
|
} elseif ($cmd == Delivery::SUGGESTION) {
|
||||||
$suggest = DI::fsuggest()->getById($target_id);
|
$suggest = DI::fsuggest()->getById($target_id);
|
||||||
|
@ -744,10 +744,10 @@ class Notifier
|
||||||
DBA::close($contacts_stmt);
|
DBA::close($contacts_stmt);
|
||||||
|
|
||||||
$inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser(0);
|
$inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser(0);
|
||||||
foreach ($inboxes as $inbox) {
|
foreach ($inboxes as $inbox => $receivers) {
|
||||||
Logger::info('Account removal via ActivityPub', ['uid' => $self_user_id, 'inbox' => $inbox]);
|
Logger::info('Account removal via ActivityPub', ['uid' => $self_user_id, 'inbox' => $inbox]);
|
||||||
Worker::add(['priority' => PRIORITY_NEGLIGIBLE, 'created' => $created, 'dont_fork' => true],
|
Worker::add(['priority' => PRIORITY_NEGLIGIBLE, 'created' => $created, 'dont_fork' => true],
|
||||||
'APDelivery', Delivery::REMOVAL, '', $inbox, $self_user_id);
|
'APDelivery', Delivery::REMOVAL, 0, $inbox, $self_user_id, $receivers);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -834,10 +834,10 @@ class Notifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// We deliver posts to relay servers slightly delayed to priorize the direct delivery
|
// We deliver posts to relay servers slightly delayed to priorize the direct delivery
|
||||||
foreach ($relay_inboxes as $inbox => $receivers) {
|
foreach ($relay_inboxes as $inbox) {
|
||||||
Logger::info('Delivery to relay servers via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]);
|
Logger::info('Delivery to relay servers via ActivityPub', ['cmd' => $cmd, 'id' => $target_item['id'], 'inbox' => $inbox]);
|
||||||
|
|
||||||
if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, $receivers)) {
|
if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid)) {
|
||||||
$delivery_queue_count++;
|
$delivery_queue_count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,10 +40,10 @@ class ProfileUpdate {
|
||||||
|
|
||||||
$inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser($uid);
|
$inboxes = ActivityPub\Transmitter::fetchTargetInboxesforUser($uid);
|
||||||
|
|
||||||
foreach ($inboxes as $inbox) {
|
foreach ($inboxes as $inbox => $receivers) {
|
||||||
Logger::log('Profile update for user ' . $uid . ' to ' . $inbox .' via ActivityPub', Logger::DEBUG);
|
Logger::log('Profile update for user ' . $uid . ' to ' . $inbox .' via ActivityPub', Logger::DEBUG);
|
||||||
Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true],
|
Worker::add(['priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true],
|
||||||
'APDelivery', Delivery::PROFILEUPDATE, '', $inbox, $uid);
|
'APDelivery', Delivery::PROFILEUPDATE, 0, $inbox, $uid, $receivers);
|
||||||
}
|
}
|
||||||
|
|
||||||
Diaspora::sendProfile($uid);
|
Diaspora::sendProfile($uid);
|
||||||
|
|
Loading…
Reference in a new issue