ElasticSearchGoodsNewCrontab.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. <?php
  2. namespace App\Crontab;
  3. use EasySwoole\Crontab\JobInterface;
  4. use App\Com\SmtpSend;
  5. use EasySwoole\EasySwoole\Config;
  6. use App\Models\WarningBody;
  7. use EasySwoole\Mysqli\QueryBuilder;
  8. use EasySwoole\ORM\DbManager;
  9. use EasySwoole\EasySwoole\Logger;
  10. /**
  11. * 任务说明:每10分钟抓取30分钟内更新的商品内容,
  12. * 使用福利api提供的接口推送到Es搜索引擎中
  13. */
  14. class ElasticSearchGoodsNewCrontab implements JobInterface
  15. {
  16. protected $time = 60 * 30; //xx内更新或新增的商品同步到es
  17. public function jobName(): string
  18. {
  19. // 定时任务的名称
  20. return 'ElasticSearchGoodsNewCrontab';
  21. }
  22. public function crontabRule(): string
  23. {
  24. // 定义执行规则 根据 Crontab 来定义
  25. // 这里是每10分钟执行 1 次
  26. return '*/10 * * * *';
  27. // return '* * * * *';
  28. }
  29. public function run()
  30. {
  31. //定时任务的执行逻辑
  32. // $welfare_api_set = Config::getInstance()->getConf('WELFARE_API');
  33. $welfare_api_set = [
  34. 'url' => 'https://open.api.superdesk.cn/api/base/dict_new/pushGoods',
  35. // 'url' => '',
  36. ];
  37. $goods = $this->getUpdateGoods();
  38. foreach ($goods as $key => $value) {
  39. $res = $this->http_request(
  40. $welfare_api_set['url'],
  41. ['goodsid' => $value['id'], 'goods_body' => json_encode($value)]
  42. );
  43. //返回内容写入日志
  44. Logger::getInstance()->notice("(new)商品ID{$value['id']}导入结果:{$res}\n");
  45. }
  46. }
  47. protected function getUpdateGoods()
  48. {
  49. $ids = array_unique(array_merge($this->getGoodsIdFromPool(), $this->getGoodsIdFromOperators_goods()));
  50. if (!$ids) {
  51. return [];
  52. }
  53. $ids = implode(',', $ids);
  54. $where = " id IN ($ids)";
  55. $que_sql = "SELECT * FROM ims_superdesk_shop_goods WHERE {$where}; ";
  56. $queryBuild = new QueryBuilder();
  57. $queryBuild->raw($que_sql);
  58. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  59. $goods = $res_data['result'];
  60. $new_goods = [];
  61. foreach ($goods as $k => $item) {
  62. $item['operators_id'] = $this->getOperatorsIdByGoodsId($item['id']);
  63. $new_goods[$k] = $item;
  64. }
  65. return $new_goods;
  66. }
  67. function getOperatorsIdByGoodsId($goods_id)
  68. {
  69. $sql = "select operators_id from ims_superdesk_shop_operators_goods where goodsid = '{$goods_id}' and `status`=1 AND deleted=0 AND checked=0 ";
  70. $queryBuild = new QueryBuilder();
  71. $queryBuild->raw($sql);
  72. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  73. $result = $res_data['result'];
  74. if (!$result) {
  75. return [];
  76. }
  77. return array_column($result, 'operators_id');
  78. }
  79. /**
  80. * 获取变更的商品ID来自商品主表
  81. */
  82. protected function getGoodsIdFromPool()
  83. {
  84. $time = time() - $this->time;
  85. $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
  86. $que_sql = "SELECT id FROM ims_superdesk_shop_goods WHERE {$where} ";
  87. $queryBuild = new QueryBuilder();
  88. $queryBuild->raw($que_sql);
  89. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  90. return array_column($res_data['result'], 'id');
  91. }
  92. /**
  93. * 获取变更的商品ID来自运营商商品表
  94. */
  95. protected function getGoodsIdFromOperators_goods()
  96. {
  97. $time = time() - $this->time;
  98. $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
  99. $que_sql = "SELECT goodsid FROM ims_superdesk_shop_operators_goods WHERE {$where} ";
  100. $queryBuild = new QueryBuilder();
  101. $queryBuild->raw($que_sql);
  102. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  103. return array_column($res_data['result'], 'goodsid');
  104. }
  105. /**
  106. * http请求
  107. * @param $url 请求地址
  108. * @param $data 数组内容
  109. * @return String 请求返回原生数据
  110. */
  111. protected function http_request($url, $data)
  112. {
  113. $opts = array(
  114. 'http' => array(
  115. 'method' => 'POST',
  116. 'header' => "Content-type:application/x-www-form-urlencoded",
  117. 'content' => http_build_query($data),
  118. )
  119. );
  120. $context = stream_context_create($opts);
  121. $response = file_get_contents($url, false, $context);
  122. return $response;
  123. }
  124. public function onException(\Throwable $throwable)
  125. {
  126. // 捕获 run 方法内所抛出的异常
  127. }
  128. }