ElasticSearchGoodsNewCrontab2.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. <?php
  2. namespace App\Crontab;
  3. use EasySwoole\Crontab\JobInterface;
  4. use App\Com\SmtpSend;
  5. use EasySwoole\EasySwoole\Config;
  6. use App\Models\WarningBody;
  7. use EasySwoole\Mysqli\QueryBuilder;
  8. use EasySwoole\ORM\DbManager;
  9. use EasySwoole\EasySwoole\Logger;
  10. /**
  11. * 任务说明:每10分钟抓取30分钟内更新的商品内容,
  12. * 使用福利api提供的接口推送到Es搜索引擎中
  13. */
  14. class ElasticSearchGoodsNewCrontab2 implements JobInterface
  15. {
  16. protected $time = 60 * 20; //xx内更新或新增的商品同步到es
  17. protected $ids = [];
  18. public function jobName(): string
  19. {
  20. // 定时任务的名称
  21. return 'ElasticSearchGoodsNewCrontab2';
  22. }
  23. public function crontabRule(): string
  24. {
  25. // 定义执行规则 根据 Crontab 来定义
  26. // 这里是每10分钟执行 1 次
  27. // return '*/10 * * * *';
  28. // return '* * * * *';
  29. return '*/3 * * * *';
  30. }
  31. public function run()
  32. {
  33. //定时任务的执行逻辑
  34. // $welfare_api_set = Config::getInstance()->getConf('WELFARE_API');
  35. $welfare_api_set = [
  36. // 'url' => 'https://open.api.superdesk.cn/api/base/dict_new/pushGoods',
  37. 'url' => 'http://open.api.superdesk.cn/api/base/dict_new/pushGoods',
  38. // 'url' => '',
  39. ];
  40. $page = 1;
  41. while (true) {
  42. if ($page > 20) {
  43. $this->ids = [];
  44. break;
  45. }
  46. $goods = $this->getUpdateGoods($page);
  47. if (!$goods) {
  48. $this->ids = [];
  49. break;
  50. } else {
  51. $page++;
  52. }
  53. foreach ($goods as $key => $value) {
  54. $res = $this->http_request(
  55. $welfare_api_set['url'],
  56. [
  57. 'es_index' => 'superdesk_welfare_20230511',
  58. 'es_type' => 'ims_superdesk_shop_goods',
  59. 'goodsid' => $value['id'],
  60. 'goods_body' => json_encode($value),
  61. ]
  62. );
  63. //返回内容写入日志
  64. Logger::getInstance()->notice("(new2)商品ID{$value['id']}导入结果:{$res}\n");
  65. }
  66. }
  67. }
  68. protected function getUpdateGoods($page = 1)
  69. {
  70. $size = 50;
  71. $offset = ($page - 1) * $size;
  72. if ($this->ids) {
  73. $ids = $this->ids;
  74. } else {
  75. $ids = array_unique(array_merge($this->getGoodsIdFromPool(), $this->getGoodsIdFromOperators_goods()));
  76. $this->ids = $ids;
  77. }
  78. // $ids = array_unique(array_merge($this->getGoodsIdFromPool(), $this->getGoodsIdFromOperators_goods()));
  79. if (!$ids) {
  80. return [];
  81. }
  82. $ids = implode(',', $ids);
  83. $where = " id IN ($ids)";
  84. $que_sql = "SELECT * FROM ims_superdesk_shop_goods WHERE {$where} LIMIT {$offset},{$size}; ";
  85. $queryBuild = new QueryBuilder();
  86. $queryBuild->raw($que_sql);
  87. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  88. $goods = $res_data['result'];
  89. $new_goods = [];
  90. foreach ($goods as $k => $item) {
  91. $item['operators_id'] = $this->getOperatorsIdByGoodsId($item['id']);
  92. $item['category'] = $this->getOperatorsCategory($item['id'], $item['ccate'], $item['operators_id']);
  93. $new_goods[$k] = $item;
  94. }
  95. return $new_goods;
  96. }
  97. function getOperatorsCategory($goods_id, $ccate, $operators_ids)
  98. {
  99. $result = array();
  100. $queryBuild = new QueryBuilder();
  101. $category_type = 0;
  102. foreach ($operators_ids as $operators_id) {
  103. $sql = "select category_type from ims_superdesk_shop_operators_user where id = '{$operators_id}'";
  104. $queryBuild->raw($sql);
  105. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  106. $res = $res_data['result'];
  107. if ($res) {
  108. $category_type = $res[0]['category_type'];
  109. }
  110. if ($category_type) { //自建分类
  111. $sql = "select ccate from ims_superdesk_shop_operators_goods where goodsid = '{$goods_id}' and operators_id = '{$operators_id}' ";
  112. $queryBuild->raw($sql);
  113. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  114. $res = $res_data['result'];
  115. if ($res) {
  116. $ccate = $res[0]['ccate'];
  117. }
  118. }
  119. $result[] = $operators_id.'_'.$category_type.'_'.$ccate;
  120. }
  121. return $result;
  122. }
  123. function getOperatorsIdByGoodsId($goods_id)
  124. {
  125. $sql = "select operators_id from ims_superdesk_shop_operators_goods where goodsid = '{$goods_id}' and `status`=1 AND deleted=0 AND checked=0 ";
  126. $queryBuild = new QueryBuilder();
  127. $queryBuild->raw($sql);
  128. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  129. $result = $res_data['result'];
  130. if (!$result) {
  131. return [];
  132. }
  133. return array_column($result, 'operators_id');
  134. }
  135. /**
  136. * 获取变更的商品ID来自商品主表
  137. */
  138. protected function getGoodsIdFromPool()
  139. {
  140. $time = time() - $this->time;
  141. $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
  142. $que_sql = "SELECT id FROM ims_superdesk_shop_goods WHERE {$where} ORDER BY updatetime DESC LIMIT 1000 ";
  143. $queryBuild = new QueryBuilder();
  144. $queryBuild->raw($que_sql);
  145. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  146. return array_column($res_data['result'], 'id');
  147. }
  148. /**
  149. * 获取变更的商品ID来自运营商商品表
  150. */
  151. protected function getGoodsIdFromOperators_goods()
  152. {
  153. $time = time() - $this->time;
  154. $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
  155. $que_sql = "SELECT goodsid FROM ims_superdesk_shop_operators_goods WHERE {$where} ORDER BY updatetime DESC LIMIT 1000 ";
  156. $queryBuild = new QueryBuilder();
  157. $queryBuild->raw($que_sql);
  158. $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
  159. return array_column($res_data['result'], 'goodsid');
  160. }
  161. /**
  162. * http请求
  163. * @param $url 请求地址
  164. * @param $data 数组内容
  165. * @return String 请求返回原生数据
  166. */
  167. protected function http_request($url, $data)
  168. {
  169. $opts = array(
  170. 'http' => array(
  171. 'method' => 'POST',
  172. 'header' => "Content-type:application/x-www-form-urlencoded",
  173. 'content' => http_build_query($data),
  174. )
  175. );
  176. $context = stream_context_create($opts);
  177. $response = file_get_contents($url, false, $context);
  178. return $response;
  179. }
  180. public function onException(\Throwable $throwable)
  181. {
  182. // 捕获 run 方法内所抛出的异常
  183. }
  184. }