Task.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. <?php
  2. namespace EasySwoole\Task;
  3. use EasySwoole\Component\Process\AbstractProcess;
  4. use EasySwoole\Component\Process\Socket\UnixProcessConfig;
  5. use EasySwoole\Task\Exception\Exception;
  6. use Swoole\Atomic\Long;
  7. use Swoole\Server;
  8. use Swoole\Table;
  9. class Task
  10. {
  11. private $taskIdAtomic;
  12. private $config;
  13. private $attachServer = false;
  14. private $table;
  15. const PUSH_IN_QUEUE = 0;
  16. const PUSH_QUEUE_FAIL = -1;
  17. const ERROR_PROCESS_BUSY = -2;
  18. const ERROR_PROTOCOL_ERROR = -3;
  19. const ERROR_ILLEGAL_PACKAGE = -4;
  20. const ERROR_TASK_ERROR = -5;
  21. const ERROR_PACKAGE_EXPIRE = -6;
  22. const ERROR_SOCK_TIMEOUT = -7;
  23. static function errCode2Msg(int $code):string
  24. {
  25. switch ($code){
  26. case self::PUSH_IN_QUEUE:{
  27. return 'push task in queue';
  28. }
  29. case self::PUSH_QUEUE_FAIL:{
  30. return 'push task to queue fail';
  31. }
  32. case self::ERROR_PROCESS_BUSY:{
  33. return 'task process busy';
  34. }
  35. case self::ERROR_PROTOCOL_ERROR:{
  36. return 'task package protocol error';
  37. }
  38. case self::ERROR_ILLEGAL_PACKAGE:{
  39. return 'task package illegal';
  40. }
  41. case self::ERROR_TASK_ERROR:{
  42. return "task run error";
  43. }
  44. case self::ERROR_PACKAGE_EXPIRE:{
  45. return "task package expire";
  46. }
  47. case self::ERROR_SOCK_TIMEOUT:{
  48. return "task sock timeout";
  49. }
  50. default:{
  51. return 'unknown error';
  52. }
  53. }
  54. }
  55. function __construct(Config $config = null)
  56. {
  57. $this->taskIdAtomic = new Long(0);
  58. $this->table = new Table(512);
  59. $this->table->column('running',Table::TYPE_INT,8);
  60. $this->table->column('success',Table::TYPE_INT,8);
  61. $this->table->column('fail',Table::TYPE_INT,8);
  62. $this->table->column('pid',Table::TYPE_INT,8);
  63. $this->table->column('startUpTime',Table::TYPE_INT,8);
  64. $this->table->create();
  65. if($config){
  66. $this->config = $config;
  67. }else{
  68. $this->config = new Config();
  69. }
  70. }
  71. function getConfig():Config
  72. {
  73. return $this->config;
  74. }
  75. function status():array
  76. {
  77. $ret = [];
  78. foreach ($this->table as $key => $value){
  79. $ret[$key] = $value;
  80. }
  81. return $ret;
  82. }
  83. public function attachToServer(Server $server)
  84. {
  85. if(!$this->attachServer){
  86. $list = $this->__initProcess();
  87. /** @var AbstractProcess $item */
  88. foreach ($list as $item){
  89. $server->addProcess($item->getProcess());
  90. }
  91. $this->attachServer = true;
  92. return true;
  93. }else{
  94. throw new Exception("Task instance has been attach to server");
  95. }
  96. }
  97. public function __initProcess():array
  98. {
  99. $ret = [];
  100. $serverName = $this->config->getServerName();
  101. for($i = 0;$i < $this->config->getWorkerNum();$i++){
  102. $config = new UnixProcessConfig();
  103. $config->setProcessName("{$serverName}.TaskWorker.{$i}");
  104. $config->setSocketFile($this->idToUnixName($i));
  105. $config->setProcessGroup("{$serverName}.TaskWorker");
  106. $config->setArg([
  107. 'workerIndex'=>$i,
  108. 'taskIdAtomic'=>$this->taskIdAtomic,
  109. 'taskConfig'=>$this->config,
  110. 'infoTable'=>$this->table
  111. ]);
  112. $ret[$i] = new Worker($config);
  113. }
  114. return $ret;
  115. }
  116. public function async($task,callable $finishCallback = null,$taskWorkerId = null,float $timeout = null):?int
  117. {
  118. if($taskWorkerId === null){
  119. $taskWorkerId = $this->randomWorkerId();
  120. }
  121. $package = new Package();
  122. $package->setType($package::ASYNC);
  123. $package->setTask($task);
  124. $package->setOnFinish($finishCallback);
  125. return $this->sendAndRecv($package,$taskWorkerId,$timeout);
  126. }
  127. /*
  128. * 同步返回执行结果
  129. */
  130. public function sync($task,float $timeout = 3.0,$taskWorkerId = null)
  131. {
  132. if($taskWorkerId === null){
  133. $taskWorkerId = $this->randomWorkerId();
  134. }
  135. $package = new Package();
  136. $package->setType($package::SYNC);
  137. $package->setTask($task);
  138. return $this->sendAndRecv($package,$taskWorkerId,$timeout);
  139. }
  140. private function idToUnixName(int $id):string
  141. {
  142. return $this->config->getTempDir()."/{$this->config->getServerName()}.TaskWorker.{$id}.sock";
  143. }
  144. private function randomWorkerId()
  145. {
  146. mt_srand();
  147. return rand(0,$this->config->getWorkerNum() - 1);
  148. }
  149. private function sendAndRecv(Package $package,int $id,float $timeout = null)
  150. {
  151. if($timeout === null){
  152. $timeout = $this->config->getTimeout();
  153. }
  154. if($timeout > 0){
  155. $package->setExpire(microtime(true) + $timeout);
  156. }else{
  157. $package->setExpire(-1);
  158. }
  159. $client = new UnixClient($this->idToUnixName($id),$this->getConfig()->getMaxPackageSize());
  160. $client->send(Protocol::pack(\Opis\Closure\serialize($package)));
  161. $ret = $client->recv($timeout);
  162. $client->close();
  163. if (!empty($ret)) {
  164. return \Opis\Closure\unserialize(Protocol::unpack($ret));
  165. }else{
  166. return self::ERROR_SOCK_TIMEOUT;
  167. }
  168. }
  169. }