Scheduler.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. <?php
  2. namespace EasySwoole\Crontab;
  3. use Cron\CronExpression;
  4. use EasySwoole\Component\Process\AbstractProcess;
  5. use EasySwoole\Component\Timer;
  6. use Swoole\Table;
  7. class Scheduler extends AbstractProcess
  8. {
  9. /** @var Table */
  10. private $schedulerTable;
  11. /** @var Crontab */
  12. private $crontabInstance;
  13. private $timerIds = [];
  14. protected function run($arg)
  15. {
  16. $this->crontabInstance = $arg['crontabInstance'];
  17. $this->schedulerTable = $arg['schedulerTable'];
  18. //异常的时候,worker会退出。先清空一遍规则,禁止循环的时候删除key
  19. $keys = [];
  20. foreach ($this->schedulerTable as $key => $value) {
  21. $keys[] = $key;
  22. }
  23. foreach ($keys as $key) {
  24. $this->schedulerTable->del($key);
  25. }
  26. $jobs = $arg['jobs'];
  27. /**
  28. * @var $jobName
  29. * @var JobInterface $job
  30. */
  31. foreach ($jobs as $jobName => $job) {
  32. $nextTime = CronExpression::factory($job->crontabRule())->getNextRunDate()->getTimestamp();
  33. $this->schedulerTable->set($jobName, ['taskRule' => $job->crontabRule(), 'taskRunTimes' => 0, 'taskNextRunTime' => $nextTime, 'taskCurrentRunTime' => 0, 'isStop' => 0]);
  34. }
  35. $this->cronProcess();
  36. //60无法被8整除。
  37. Timer::getInstance()->loop(8 * 1000, function () {
  38. $this->cronProcess();
  39. });
  40. }
  41. private function cronProcess()
  42. {
  43. foreach ($this->schedulerTable as $jobName => $task) {
  44. if (intval($task['isStop']) == 1) {
  45. continue;
  46. }
  47. $nextRunTime = CronExpression::factory($task['taskRule'])->getNextRunDate()->getTimestamp();
  48. if ($task['taskNextRunTime'] != $nextRunTime) {
  49. $this->schedulerTable->set($jobName, ['taskNextRunTime' => $nextRunTime]);
  50. }
  51. //本轮已经创建过任务
  52. if (isset($this->timerIds[$jobName])) {
  53. continue;
  54. }
  55. $distanceTime = $nextRunTime - time();
  56. $timerId = Timer::getInstance()->after($distanceTime * 1000, function () use ($jobName) {
  57. unset($this->timerIds[$jobName]);
  58. try {
  59. $this->crontabInstance->rightNow($jobName);
  60. } catch (\Throwable $throwable) {
  61. $call = $this->crontabInstance->getConfig()->getOnException();
  62. if (is_callable($call)) {
  63. call_user_func($call, $throwable);
  64. } else {
  65. throw $throwable;
  66. }
  67. }
  68. });
  69. if ($timerId) {
  70. $this->timerIds[$jobName] = $timerId;
  71. }
  72. }
  73. }
  74. }