taskIdAtomic = new Long(0); $this->table = new Table(512); $this->table->column('running',Table::TYPE_INT,8); $this->table->column('success',Table::TYPE_INT,8); $this->table->column('fail',Table::TYPE_INT,8); $this->table->column('pid',Table::TYPE_INT,8); $this->table->column('startUpTime',Table::TYPE_INT,8); $this->table->create(); if($config){ $this->config = $config; }else{ $this->config = new Config(); } } function getConfig():Config { return $this->config; } function status():array { $ret = []; foreach ($this->table as $key => $value){ $ret[$key] = $value; } return $ret; } public function attachToServer(Server $server) { if(!$this->attachServer){ $list = $this->__initProcess(); /** @var AbstractProcess $item */ foreach ($list as $item){ $server->addProcess($item->getProcess()); } $this->attachServer = true; return true; }else{ throw new Exception("Task instance has been attach to server"); } } public function __initProcess():array { $ret = []; $serverName = $this->config->getServerName(); for($i = 0;$i < $this->config->getWorkerNum();$i++){ $config = new UnixProcessConfig(); $config->setProcessName("{$serverName}.TaskWorker.{$i}"); $config->setSocketFile($this->idToUnixName($i)); $config->setProcessGroup("{$serverName}.TaskWorker"); $config->setArg([ 'workerIndex'=>$i, 'taskIdAtomic'=>$this->taskIdAtomic, 'taskConfig'=>$this->config, 'infoTable'=>$this->table ]); $ret[$i] = new Worker($config); } return $ret; } public function async($task,callable $finishCallback = null,$taskWorkerId = null,float $timeout = null):?int { if($taskWorkerId === null){ $taskWorkerId = $this->randomWorkerId(); } $package = new Package(); $package->setType($package::ASYNC); $package->setTask($task); $package->setOnFinish($finishCallback); return $this->sendAndRecv($package,$taskWorkerId,$timeout); } /* * 同步返回执行结果 */ public function sync($task,float $timeout = 3.0,$taskWorkerId = null) { if($taskWorkerId === null){ $taskWorkerId = $this->randomWorkerId(); } $package = new Package(); $package->setType($package::SYNC); $package->setTask($task); return $this->sendAndRecv($package,$taskWorkerId,$timeout); } private function idToUnixName(int $id):string { return $this->config->getTempDir()."/{$this->config->getServerName()}.TaskWorker.{$id}.sock"; } private function randomWorkerId() { mt_srand(); return rand(0,$this->config->getWorkerNum() - 1); } private function sendAndRecv(Package $package,int $id,float $timeout = null) { if($timeout === null){ $timeout = $this->config->getTimeout(); } if($timeout > 0){ $package->setExpire(microtime(true) + $timeout); }else{ $package->setExpire(-1); } $client = new UnixClient($this->idToUnixName($id),$this->getConfig()->getMaxPackageSize()); $client->send(Protocol::pack(\Opis\Closure\serialize($package))); $ret = $client->recv($timeout); $client->close(); if (!empty($ret)) { return \Opis\Closure\unserialize(Protocol::unpack($ret)); }else{ return self::ERROR_SOCK_TIMEOUT; } } }