. * * The interactive user interfaces in modified source and object code versions * of this program must display Appropriate Legal Notices, as required under * Section 5 of the GNU Affero General Public License version 3. * * In accordance with Section 7(b) of the GNU Affero General Public License version 3, * these Appropriate Legal Notices must retain the display of the "EspoCRM" word. ************************************************************************/ namespace Espo\Core\Webhook; use Espo\Core\Field\DateTime; use Espo\Core\Field\LinkParent; use Espo\Core\Name\Field; use Espo\Entities\User; use Espo\Entities\Webhook; use Espo\Entities\WebhookEventQueueItem; use Espo\Entities\WebhookQueueItem; use Espo\Core\AclManager; use Espo\Core\Utils\Config; use Espo\Core\Utils\DateTime as DateTimeUtil; use Espo\Core\Utils\Log; use Espo\ORM\EntityManager; use Espo\ORM\Name\Attribute; use Espo\ORM\Query\Part\Condition as Cond; use Espo\ORM\Query\SelectBuilder; use Exception; use stdClass; /** * Groups occurred events into portions and sends them. A portion contains * multiple events of the same webhook. */ class Queue { private const EVENT_PORTION_SIZE = 20; private const PORTION_SIZE = 20; private const BATCH_SIZE = 50; private const MAX_ATTEMPT_NUMBER = 4; private const FAIL_ATTEMPT_PERIOD = '10 minutes'; public function __construct( private Sender $sender, private Config $config, private EntityManager $entityManager, private AclManager $aclManager, private Log $log, ) {} public function process(): void { $this->processEvents(); $this->processSending(); } protected function processEvents(): void { $portionSize = $this->config->get('webhookQueueEventPortionSize', self::EVENT_PORTION_SIZE); $items = $this->entityManager ->getRDBRepositoryByClass(WebhookEventQueueItem::class) ->where(['isProcessed' => false]) ->order('number') ->limit(0, $portionSize) ->find(); foreach ($items as $item) { $this->createQueueFromEvent($item); $item->setIsProcessed(); $this->entityManager->saveEntity($item); } } protected function createQueueFromEvent(WebhookEventQueueItem $eventItem): void { if (!$eventItem->getTargetId() || !$eventItem->getTargetType()) { return; } $target = LinkParent::create($eventItem->getTargetType(), $eventItem->getTargetId()); $webhooks = $this->entityManager ->getRDBRepositoryByClass(Webhook::class) ->where([ 'event' => $eventItem->getEvent(), 'isActive' => true, ]) ->order(Field::CREATED_AT) ->find(); foreach ($webhooks as $webhook) { $this->createItem($eventItem, $webhook, $target); } } protected function processSending(): void { $portionSize = $this->config->get('webhookQueuePortionSize', self::PORTION_SIZE); $groupedItems = $this->entityManager ->getRDBRepositoryByClass(WebhookQueueItem::class) ->select([ 'webhookId', 'number', ]) ->where( Cond::in( Cond::column('number'), $this->entityManager ->getQueryBuilder() ->select('MIN:(number)') ->from(WebhookQueueItem::ENTITY_TYPE) ->where([ 'status' => WebhookQueueItem::STATUS_PENDING, 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], ]) ->group('webhookId') ->build() ) ) ->limit(0, $portionSize) ->order('number') ->find(); foreach ($groupedItems as $groupItem) { $this->processSendingGroup($groupItem->getWebhookId()); } } private function processSendingGroup(string $webhookId): void { $batchSize = $this->config->get('webhookBatchSize', self::BATCH_SIZE); $items = $this->entityManager ->getRDBRepositoryByClass(WebhookQueueItem::class) ->where([ 'webhookId' => $webhookId, 'status' => WebhookQueueItem::STATUS_PENDING, 'OR' => [ ['processAt' => null], ['processAt<=' => DateTimeUtil::getSystemNowString()], ], ]) ->order('number') ->limit(0, $batchSize) ->find(); $webhook = $this->entityManager->getRDBRepositoryByClass(Webhook::class)->getById($webhookId); if (!$webhook || !$webhook->isActive()) { foreach ($items as $item) { $this->deleteQueueItem($item); } return; } $forbiddenAttributeList = []; $user = null; if ($webhook->getUserId()) { $user = $this->entityManager->getRDBRepositoryByClass(User::class)->getById($webhook->getUserId()); if (!$user) { foreach ($items as $item) { $this->deleteQueueItem($item); } return; } $forbiddenAttributeList = $this->aclManager ->getScopeForbiddenAttributeList($user, $webhook->getTargetEntityType()); } $actualItemList = []; $dataList = []; foreach ($items as $item) { $data = $this->prepareItemData($item, $user, $forbiddenAttributeList); if ($data === null) { continue; } $actualItemList[] = $item; $dataList[] = $data; } if ($dataList === []) { return; } $this->send($webhook, $dataList, $actualItemList); } /** * @param string[] $forbiddenAttributeList */ private function prepareItemData(WebhookQueueItem $item, ?User $user, array $forbiddenAttributeList): ?stdClass { $targetType = $item->getTargetType(); if (!$targetType) { $this->deleteQueueItem($item); return null; } $target = null; if ($this->entityManager->hasRepository($targetType)) { $query = SelectBuilder::create() ->from($targetType) ->withDeleted() ->build(); $target = $this->entityManager ->getRDBRepository($targetType) ->clone($query) ->where([Attribute::ID => $item->getTargetId()]) ->findOne(); } if (!$target) { $this->deleteQueueItem($item); return null; } if ($user) { if (!$this->aclManager->check($user, $target)) { $this->deleteQueueItem($item); return null; } } $data = $item->getData(); foreach ($forbiddenAttributeList as $attribute) { unset($data->$attribute); } return $data; } /** * @param stdClass[] $dataList * @param WebhookQueueItem[] $itemList */ private function send(Webhook $webhook, array $dataList, array $itemList): void { try { $code = $this->sender->send($webhook, $dataList); } catch (Exception $e) { $this->failQueueItemList($itemList, true); $this->log->error("Webhook Queue: Webhook '{$webhook->getId()}' sending failed. Error: {$e->getMessage()}"); return; } if ($code >= 200 && $code < 400) { $this->succeedQueueItemList($itemList); } else if ($code === 410) { $this->dropWebhook($webhook); } else if (in_array($code, [0, 401, 403, 404, 405, 408, 500, 503])) { $this->failQueueItemList($itemList); } else if ($code >= 400 && $code < 500) { $this->failQueueItemList($itemList, true); } else { $this->failQueueItemList($itemList, true); } $this->logSending($webhook, $code); } protected function logSending(Webhook $webhook, int $code): void { $this->log->debug("Webhook Queue: Webhook '{$webhook->getId()}' sent, response code: $code."); } /** * @param WebhookQueueItem[] $itemList */ protected function failQueueItemList(array $itemList, bool $force = false): void { foreach ($itemList as $item) { $this->failQueueItem($item, $force); } } /** * @param WebhookQueueItem[] $itemList */ protected function succeedQueueItemList(array $itemList): void { foreach ($itemList as $item) { $this->succeedQueueItem($item); } } protected function deleteQueueItem(WebhookQueueItem $item): void { $this->entityManager ->getRDBRepository(WebhookQueueItem::ENTITY_TYPE) ->deleteFromDb($item->getId()); } protected function dropWebhook(Webhook $webhook): void { $items = $this->entityManager ->getRDBRepositoryByClass(WebhookQueueItem::class) ->where([ 'status' => WebhookQueueItem::STATUS_PENDING, 'webhookId' => $webhook->getId(), ]) ->order('number') ->find(); foreach ($items as $item) { $this->deleteQueueItem($item); } $this->entityManager->removeEntity($webhook); } protected function succeedQueueItem(WebhookQueueItem $item): void { $item ->setAttempts($item->getAttempts() + 1) ->setStatus(WebhookQueueItem::STATUS_SUCCESS) ->setProcessedAt(DateTime::createNow()); $this->entityManager->saveEntity($item); } protected function failQueueItem(WebhookQueueItem $item, bool $force = false): void { $attempts = $item->getAttempts() + 1; $maxAttemptsNumber = $this->config->get('webhookMaxAttemptNumber', self::MAX_ATTEMPT_NUMBER); $period = $this->config->get('webhookFailAttemptPeriod', self::FAIL_ATTEMPT_PERIOD); if ($force) { $maxAttemptsNumber = 0; } $processAt = DateTime::createNow()->modify($period); $item->setAttempts($attempts); $item->setProcessAt($processAt); if ($attempts >= $maxAttemptsNumber) { $item->setStatus(WebhookQueueItem::STATUS_FAILED); $item->setProcessAt(null); } $this->entityManager->saveEntity($item); } private function createItem(WebhookEventQueueItem $eventItem, Webhook $webhook, LinkParent $target): void { if ( $webhook->skipOwn() && $webhook->getUserId() && $eventItem->getUserId() === $webhook->getUserId() ) { return; } $item = $this->entityManager->getRDBRepositoryByClass(WebhookQueueItem::class)->getNew(); $item ->setEvent($eventItem->getEvent()) ->setWebhook($webhook) ->setTarget($target) ->setStatus(WebhookQueueItem::STATUS_PENDING) ->setAttempts(0) ->setData($eventItem->getData()); $this->entityManager->saveEntity($item); } }