config->get('timeZone'); foreach ($this->getWorkflows() as $workflow) { $timeZone = $workflow->getSchedulingApplyTimezone() ? $defaultTimeZone : null; $scheduling = $workflow->getScheduling(); try { $cronExpression = new CronExpression($scheduling); $dateTime = $cronExpression->getNextRunDate('now', 0, true, $timeZone) ->setTimezone(new DateTimeZone('UTC')); $time = DateTime::fromDateTime($dateTime); } catch (Exception $e) { $this->log->error("Bad scheduling in workflow {id}.", [ 'exception' => $e, 'id' => $workflow->getId(), ]); continue; } if ($workflow->getLastRun() && $workflow->getLastRun()->isEqualTo($time)) { continue; } if ($this->jobExists($time, $workflow->getId())) { return; } $this->scheduleJob($time, $workflow->getId()); $workflow->setLastRun($time); $this->entityManager->saveEntity($workflow, ['silent' => true]); } } private function scheduleJob(DateTime $time, string $workflowId): void { $this->jobSchedulerFactory ->create() ->setClassName(RunScheduledWorkflowJob::class) ->setTime($time->toDateTime()) ->setData( Data::create() ->withTargetId($workflowId) ->withTargetType(Workflow::ENTITY_TYPE) ) ->schedule(); } private function jobExists(DateTime $time, string $workflowId): bool { $from = $time->toDateTime(); $seconds = (int) $from->format('s'); $from = $from->modify("- $seconds seconds"); $to = $from->modify('+ 1 minute'); $found = $this->entityManager ->getRDBRepositoryByClass(Job::class) ->select(['id']) ->where([ 'className' => RunScheduledWorkflowJob::class, ['executeTime>=' => $from->format(DateTimeUtil::SYSTEM_DATE_TIME_FORMAT)], ['executeTime<' => $to->format(DateTimeUtil::SYSTEM_DATE_TIME_FORMAT)], 'targetId' => $workflowId, 'targetType' => Workflow::ENTITY_TYPE, ]) ->findOne(); return (bool) $found; } /** * @return iterable */ private function getWorkflows(): iterable { return $this->entityManager ->getRDBRepositoryByClass(Workflow::class) ->where([ 'type' => Workflow::TYPE_SCHEDULED, 'isActive' => true, ]) ->order('processOrder') ->order('id') ->find(); } }