AbstractPool.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. <?php
  2. namespace EasySwoole\Pool;
  3. use EasySwoole\Pool\Exception\Exception;
  4. use EasySwoole\Pool\Exception\PoolEmpty;
  5. use EasySwoole\Utility\Random;
  6. use Swoole\Coroutine;
  7. use Swoole\Coroutine\Channel;
  8. use Swoole\Table;
  9. use Swoole\Timer;
  10. abstract class AbstractPool
  11. {
  12. private $createdNum = 0;
  13. /** @var Channel */
  14. private $poolChannel;
  15. private $objHash = [];
  16. /** @var Config */
  17. private $conf;
  18. private $intervalCheckTimerId;
  19. private $loadAverageTimerId;
  20. private $destroy = false;
  21. private $context = [];
  22. private $loadWaitTimes = 0;
  23. private $loadUseTimes = 0;
  24. private $poolHash;
  25. private $inUseObject = [];
  26. private $statusTable;
  27. /*
  28. * 如果成功创建了,请返回对应的obj
  29. */
  30. abstract protected function createObject();
  31. public function __construct(Config $conf)
  32. {
  33. if ($conf->getMinObjectNum() >= $conf->getMaxObjectNum()) {
  34. $class = static::class;
  35. throw new Exception("pool max num is small than min num for {$class} error");
  36. }
  37. $this->conf = $conf;
  38. $this->statusTable = new Table(1024);
  39. $this->statusTable->column('created',Table::TYPE_INT,10);
  40. $this->statusTable->column('pid',Table::TYPE_INT,10);
  41. $this->statusTable->column('inuse',Table::TYPE_INT,10);
  42. $this->statusTable->column('loadWaitTimes',Table::TYPE_FLOAT,10);
  43. $this->statusTable->column('loadUseTimes',Table::TYPE_INT,10);
  44. $this->statusTable->column('lastAliveTime',Table::TYPE_INT,10);
  45. $this->statusTable->create();
  46. $this->poolHash = substr(md5(spl_object_hash($this).getmypid()),8,16);
  47. }
  48. function getUsedObjects():array
  49. {
  50. return $this->inUseObject;
  51. }
  52. /*
  53. * 回收一个对象
  54. */
  55. public function recycleObj($obj): bool
  56. {
  57. /*
  58. * 当标记为销毁后,直接进行对象销毁
  59. */
  60. if ($this->destroy) {
  61. $this->unsetObj($obj);
  62. return true;
  63. }
  64. /*
  65. * 懒惰模式,可以提前创建 pool对象,因此调用钱执行初始化检测
  66. */
  67. $this->init();
  68. /*
  69. * 仅仅允许归属于本pool且不在pool内的对象进行回收
  70. */
  71. if ($this->isPoolObject($obj) && (!$this->isInPool($obj))) {
  72. /*
  73. * 主动回收可能存在的上下文
  74. */
  75. $cid = Coroutine::getCid();
  76. if (isset($this->context[$cid]) && $this->context[$cid]->__objHash === $obj->__objHash) {
  77. unset($this->context[$cid]);
  78. }
  79. $hash = $obj->__objHash;
  80. //标记为在pool内
  81. $this->objHash[$hash] = true;
  82. unset($this->inUseObject[$hash]);
  83. if ($obj instanceof ObjectInterface) {
  84. try {
  85. $obj->objectRestore();
  86. } catch (\Throwable $throwable) {
  87. //重新标记为非在pool状态,允许进行unset
  88. $this->objHash[$hash] = false;
  89. $this->unsetObj($obj);
  90. throw $throwable;
  91. }
  92. }
  93. $this->poolChannel->push($obj);
  94. return true;
  95. } else {
  96. return false;
  97. }
  98. }
  99. /*
  100. * tryTimes为出现异常尝试次数
  101. */
  102. public function getObj(float $timeout = null, int $tryTimes = 3)
  103. {
  104. /*
  105. * 懒惰模式,可以提前创建 pool对象,因此调用钱执行初始化检测
  106. */
  107. $this->init();
  108. /*
  109. * 当标记为销毁后,禁止取出对象
  110. */
  111. if ($this->destroy) {
  112. return null;
  113. }
  114. if ($timeout === null) {
  115. $timeout = $this->getConfig()->getGetObjectTimeout();
  116. }
  117. $object = null;
  118. if ($this->poolChannel->isEmpty()) {
  119. try {
  120. $this->initObject();
  121. } catch (\Throwable $throwable) {
  122. if ($tryTimes <= 0) {
  123. throw $throwable;
  124. } else {
  125. $tryTimes--;
  126. return $this->getObj($timeout, $tryTimes);
  127. }
  128. }
  129. }
  130. $start = microtime(true);
  131. $object = $this->poolChannel->pop($timeout);
  132. $take = microtime(true) - $start;
  133. // getObj 记录取出等待时间 5s周期内
  134. $this->loadWaitTimes += $take;
  135. $this->statusTable->set($this->poolHash(),[
  136. 'loadWaitTimes'=>$this->loadWaitTimes
  137. ]);
  138. if (is_object($object)) {
  139. $hash = $object->__objHash;
  140. //标记该对象已经被使用,不在pool中
  141. $this->objHash[$hash] = false;
  142. $this->inUseObject[$hash] = $object;
  143. $object->__lastUseTime = time();
  144. if ($object instanceof ObjectInterface) {
  145. try {
  146. if ($object->beforeUse() === false) {
  147. $this->unsetObj($object);
  148. if ($tryTimes <= 0) {
  149. return null;
  150. } else {
  151. $tryTimes--;
  152. return $this->getObj($timeout, $tryTimes);
  153. }
  154. }
  155. } catch (\Throwable $throwable) {
  156. $this->unsetObj($object);
  157. if ($tryTimes <= 0) {
  158. throw $throwable;
  159. } else {
  160. $tryTimes--;
  161. return $this->getObj($timeout, $tryTimes);
  162. }
  163. }
  164. }
  165. // 每次getObj 记录该连接池取出的次数 5s周期内
  166. $this->loadUseTimes++;
  167. $this->statusTable->incr($this->poolHash(),'loadUseTimes');
  168. return $object;
  169. } else {
  170. return null;
  171. }
  172. }
  173. /*
  174. * 彻底释放一个对象
  175. */
  176. public function unsetObj($obj): bool
  177. {
  178. if (!$this->isInPool($obj)) {
  179. /*
  180. * 主动回收可能存在的上下文
  181. */
  182. $cid = Coroutine::getCid();
  183. //当obj等于当前协程defer的obj时,则清除
  184. if (isset($this->context[$cid]) && $this->context[$cid]->__objHash === $obj->__objHash) {
  185. unset($this->context[$cid]);
  186. }
  187. $hash = $obj->__objHash;
  188. unset($this->objHash[$hash]);
  189. unset($this->inUseObject[$hash]);
  190. if ($obj instanceof ObjectInterface) {
  191. try {
  192. $obj->gc();
  193. } catch (\Throwable $throwable) {
  194. throw $throwable;
  195. } finally {
  196. $this->createdNum--;
  197. $this->statusTable->decr($this->poolHash(),'created');
  198. }
  199. } else {
  200. $this->createdNum--;
  201. $this->statusTable->decr($this->poolHash(),'created');
  202. }
  203. return true;
  204. } else {
  205. return false;
  206. }
  207. }
  208. /*
  209. * 超过$idleTime未出队使用的,将会被回收。
  210. */
  211. public function idleCheck(int $idleTime)
  212. {
  213. /*
  214. * 懒惰模式,可以提前创建 pool对象,因此调用钱执行初始化检测
  215. */
  216. $this->init();
  217. $size = $this->poolChannel->length();
  218. while (!$this->poolChannel->isEmpty() && $size >= 0) {
  219. $size--;
  220. $item = $this->poolChannel->pop(0.01);
  221. if(!$item){
  222. continue;
  223. }
  224. //回收超时没有使用的链接
  225. if (time() - $item->__lastUseTime > $idleTime) {
  226. //标记为不在队列内,允许进行gc回收
  227. $hash = $item->__objHash;
  228. $this->objHash[$hash] = false;
  229. $this->unsetObj($item);
  230. } else {
  231. //执行itemIntervalCheck检查
  232. if(!$this->itemIntervalCheck($item)){
  233. //标记为不在队列内,允许进行gc回收
  234. $hash = $item->__objHash;
  235. $this->objHash[$hash] = false;
  236. $this->unsetObj($item);
  237. continue;
  238. }else{
  239. $this->poolChannel->push($item);
  240. }
  241. }
  242. }
  243. }
  244. /*
  245. * 允许外部调用
  246. */
  247. public function intervalCheck()
  248. {
  249. //删除死去的进程状态
  250. $this->statusTable->set($this->poolHash(),[
  251. 'lastAliveTime'=>time()
  252. ]);
  253. $list = [];
  254. $time = time();
  255. foreach ($this->statusTable as $key => $item){
  256. if($time - $item['lastAliveTime'] >= 2){
  257. $list[] = $key;
  258. }
  259. }
  260. foreach ($list as $key){
  261. $this->statusTable->del($key);
  262. }
  263. $this->idleCheck($this->getConfig()->getMaxIdleTime());
  264. $this->keepMin($this->getConfig()->getMinObjectNum());
  265. }
  266. /**
  267. * @param $item $item->__lastUseTime 属性表示该对象被最后一次使用的时间
  268. * @return bool
  269. */
  270. protected function itemIntervalCheck($item):bool
  271. {
  272. return true;
  273. }
  274. /*
  275. * 可以解决冷启动问题
  276. */
  277. public function keepMin(?int $num = null): int
  278. {
  279. if($num == null){
  280. $num = $this->getConfig()->getMinObjectNum();
  281. }
  282. if ($this->createdNum < $num) {
  283. $left = $num - $this->createdNum;
  284. while ($left > 0) {
  285. /*
  286. * 避免死循环
  287. */
  288. if ($this->initObject() == false) {
  289. break;
  290. }
  291. $left--;
  292. }
  293. }
  294. return $this->createdNum;
  295. }
  296. public function getConfig(): Config
  297. {
  298. return $this->conf;
  299. }
  300. public function status(bool $currentWorker = false):array
  301. {
  302. if($currentWorker){
  303. return $this->statusTable->get($this->poolHash());
  304. }else{
  305. $data = [];
  306. foreach ($this->statusTable as $key => $value){
  307. $data[] = $value;
  308. }
  309. return $data;
  310. }
  311. }
  312. private function initObject(): bool
  313. {
  314. if ($this->destroy) {
  315. return false;
  316. }
  317. /*
  318. * 懒惰模式,可以提前创建 pool对象,因此调用钱执行初始化检测
  319. */
  320. $this->init();
  321. $obj = null;
  322. $this->createdNum++;
  323. $this->statusTable->incr($this->poolHash(),'created');
  324. if ($this->createdNum > $this->getConfig()->getMaxObjectNum()) {
  325. $this->createdNum--;
  326. $this->statusTable->decr($this->poolHash(),'created');
  327. return false;
  328. }
  329. try {
  330. $obj = $this->createObject();
  331. if (is_object($obj)) {
  332. $hash = Random::character(12);
  333. $this->objHash[$hash] = true;
  334. $obj->__objHash = $hash;
  335. $obj->__lastUseTime = time();
  336. $this->poolChannel->push($obj);
  337. return true;
  338. } else {
  339. $this->createdNum--;
  340. $this->statusTable->decr($this->poolHash(),'created');
  341. }
  342. } catch (\Throwable $throwable) {
  343. $this->createdNum--;
  344. $this->statusTable->decr($this->poolHash(),'created');
  345. throw $throwable;
  346. }
  347. return false;
  348. }
  349. public function isPoolObject($obj): bool
  350. {
  351. if (isset($obj->__objHash)) {
  352. return isset($this->objHash[$obj->__objHash]);
  353. } else {
  354. return false;
  355. }
  356. }
  357. public function isInPool($obj): bool
  358. {
  359. if ($this->isPoolObject($obj)) {
  360. return $this->objHash[$obj->__objHash];
  361. } else {
  362. return false;
  363. }
  364. }
  365. /*
  366. * 销毁该pool,但保留pool原有状态
  367. */
  368. function destroy()
  369. {
  370. $this->destroy = true;
  371. /*
  372. * 懒惰模式,可以提前创建 pool对象,因此调用钱执行初始化检测
  373. */
  374. $this->init();
  375. if ($this->intervalCheckTimerId && Timer::exists($this->intervalCheckTimerId)) {
  376. Timer::clear($this->intervalCheckTimerId);
  377. $this->intervalCheckTimerId = null;
  378. }
  379. if ($this->loadAverageTimerId && Timer::exists($this->loadAverageTimerId)) {
  380. Timer::clear($this->loadAverageTimerId);
  381. $this->loadAverageTimerId = null;
  382. }
  383. if($this->poolChannel){
  384. while (!$this->poolChannel->isEmpty()) {
  385. $item = $this->poolChannel->pop(0.01);
  386. $this->unsetObj($item);
  387. }
  388. foreach ($this->inUseObject as $item){
  389. $this->unsetObj($item);
  390. $this->inUseObject = [];
  391. }
  392. $this->poolChannel->close();
  393. $this->poolChannel = null;
  394. }
  395. $list = [];
  396. foreach ($this->statusTable as $key => $value){
  397. $list[] = $key;
  398. }
  399. foreach ($list as $key){
  400. $this->statusTable->del($key);
  401. }
  402. }
  403. function reset(): AbstractPool
  404. {
  405. $this->destroy();
  406. $this->createdNum = 0;
  407. $this->destroy = false;
  408. $this->context = [];
  409. $this->objHash = [];
  410. return $this;
  411. }
  412. public function invoke(callable $call, float $timeout = null)
  413. {
  414. $obj = $this->getObj($timeout);
  415. if ($obj) {
  416. try {
  417. $ret = call_user_func($call, $obj);
  418. return $ret;
  419. } catch (\Throwable $throwable) {
  420. throw $throwable;
  421. } finally {
  422. $this->recycleObj($obj);
  423. }
  424. } else {
  425. throw new PoolEmpty(static::class . " pool is empty");
  426. }
  427. }
  428. public function defer(float $timeout = null)
  429. {
  430. $cid = Coroutine::getCid();
  431. if (isset($this->context[$cid])) {
  432. return $this->context[$cid];
  433. }
  434. $obj = $this->getObj($timeout);
  435. if ($obj) {
  436. $this->context[$cid] = $obj;
  437. Coroutine::defer(function () use ($cid) {
  438. if (isset($this->context[$cid])) {
  439. $obj = $this->context[$cid];
  440. unset($this->context[$cid]);
  441. $this->recycleObj($obj);
  442. }
  443. });
  444. return $this->defer($timeout);
  445. } else {
  446. throw new PoolEmpty(static::class . " pool is empty");
  447. }
  448. }
  449. private function init()
  450. {
  451. if ((!$this->poolChannel) && (!$this->destroy)) {
  452. $this->poolChannel = new Channel($this->conf->getMaxObjectNum() + 8);
  453. if ($this->conf->getIntervalCheckTime() > 0) {
  454. $this->intervalCheckTimerId = Timer::tick($this->conf->getIntervalCheckTime(), [$this, 'intervalCheck']);
  455. }
  456. $this->loadAverageTimerId = Timer::tick(5*1000,function (){
  457. // 5s 定时检测
  458. $loadWaitTime = $this->loadWaitTimes;
  459. $loadUseTimes = $this->loadUseTimes;
  460. $this->loadUseTimes = 0;
  461. $this->loadWaitTimes = 0;
  462. $this->statusTable->set($this->poolHash(),[
  463. 'loadWaitTimes'=>0,
  464. 'loadUseTimes'=>0
  465. ]);
  466. //避免分母为0
  467. if($loadUseTimes <= 0){
  468. $loadUseTimes = 1;
  469. }
  470. $average = $loadWaitTime/$loadUseTimes; // average 记录的是平均每个链接取出的时间
  471. if($this->getConfig()->getLoadAverageTime() > $average){
  472. //负载小。尝试回收链接百分之5的链接
  473. $decNum = intval($this->createdNum * 0.05);
  474. if( ($this->createdNum - $decNum) > $this->getConfig()->getMinObjectNum()){
  475. while ($decNum > 0){
  476. $temp = $this->getObj(0.001,0);
  477. if($temp){
  478. $this->unsetObj($temp);
  479. }else{
  480. break;
  481. }
  482. $decNum--;
  483. }
  484. }
  485. }
  486. });
  487. //table记录初始化
  488. $this->statusTable->set($this->poolHash(),[
  489. 'pid'=>getmypid(),
  490. 'created'=>0,
  491. 'inuse'=>0,
  492. 'loadWaitTimes'=>0,
  493. 'loadUseTimes'=>0,
  494. 'lastAliveTime'=>0
  495. ]);
  496. }
  497. }
  498. function poolHash():string
  499. {
  500. return $this->poolHash;
  501. }
  502. final function __clone()
  503. {
  504. throw new Exception('AbstractObject cannot be clone');
  505. }
  506. }