Worker.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. <?php
  2. namespace EasySwoole\Crontab;
  3. use EasySwoole\Component\Process\Socket\AbstractUnixProcess;
  4. use EasySwoole\Crontab\Protocol\Pack;
  5. use EasySwoole\Crontab\Protocol\Command;
  6. use EasySwoole\Crontab\Protocol\Response;
  7. use Swoole\Coroutine\Socket;
  8. use Swoole\Table;
  9. class Worker extends AbstractUnixProcess
  10. {
  11. /** @var Crontab */
  12. private $crontabInstance;
  13. /** @var Table */
  14. private $workerStatisticTable;
  15. private $jobs = [];
  16. private $workerIndex = 0;
  17. /** @var Table */
  18. private $schedulerTable;
  19. public function run($arg)
  20. {
  21. $this->crontabInstance = $arg['crontabInstance'];
  22. $this->workerStatisticTable = $arg['workerStatisticTable'];
  23. $this->workerIndex = $arg['workerIndex'];
  24. $this->workerStatisticTable->set($this->workerIndex, [
  25. 'runningNum' => 0
  26. ]);
  27. $this->schedulerTable = $arg['schedulerTable'];
  28. $this->jobs = $arg['jobs'];
  29. parent::run($arg);
  30. }
  31. function onAccept(Socket $socket)
  32. {
  33. $response = new Response();
  34. $header = $socket->recvAll(4, 1);
  35. if (strlen($header) != 4) {
  36. $response->setStatus(Response::STATUS_PACKAGE_READ_TIMEOUT)->setMsg('recv from client timeout');
  37. $this->reply($socket, $response);
  38. return;
  39. }
  40. $allLength = Pack::packDataLength($header);
  41. $data = $socket->recvAll($allLength, 1);
  42. if (strlen($data) != $allLength) {
  43. $response->setStatus(Response::STATUS_PACKAGE_READ_TIMEOUT)->setMsg('recv from client timeout');
  44. $this->reply($socket, $response);
  45. return;
  46. }
  47. $data = unserialize($data);
  48. if (!$data instanceof Command) {
  49. $response->setStatus(Response::STATUS_ILLEGAL_PACKAGE)->setMsg('unserialize request as an Command instance fail');
  50. $this->reply($socket, $response);
  51. return;
  52. }
  53. if ($data->getCommand() === Command::COMMAND_EXEC_JOB) {
  54. $jobName = $data->getArg();
  55. if (isset($this->jobs[$jobName])) {
  56. /** @var JobInterface $job */
  57. $job = $this->jobs[$jobName];
  58. $this->workerStatisticTable->incr($this->workerIndex, 'runningNum', 1);
  59. $this->schedulerTable->incr($jobName, 'taskRunTimes', 1);
  60. $this->schedulerTable->set($jobName, ['taskCurrentRunTime' => time()]);
  61. try {
  62. $ret = $job->run();
  63. $response->setResult($ret);
  64. $response->setStatus(Response::STATUS_OK);
  65. } catch (\Throwable $throwable) {
  66. $response->setStatus(Response::STATUS_JOB_EXEC_ERROR);
  67. try {
  68. $job->onException($throwable);
  69. } catch (\Throwable $t) {
  70. $call = $this->crontabInstance->getConfig()->getOnException();
  71. if (is_callable($call)) {
  72. call_user_func($call, $t);
  73. } else {
  74. throw $t;
  75. }
  76. }
  77. } finally {
  78. $this->workerStatisticTable->decr($this->workerIndex, 'runningNum', 1);
  79. $this->reply($socket, $response);
  80. }
  81. } else {
  82. $response->setStatus(Response::STATUS_JOB_NOT_EXIST);
  83. $this->reply($socket, $response);
  84. }
  85. } else {
  86. $response->setStatus(Response::STATUS_UNKNOWN_COMMAND);
  87. $this->reply($socket, $response);
  88. }
  89. }
  90. private function reply(Socket $socket, Response $response)
  91. {
  92. $data = serialize($response);
  93. $socket->sendAll(Pack::pack($data));
  94. $socket->close();
  95. }
  96. }