123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- <?php
- namespace App\Crontab;
- use EasySwoole\Crontab\JobInterface;
- use App\Com\SmtpSend;
- use EasySwoole\EasySwoole\Config;
- use App\Models\WarningBody;
- use EasySwoole\Mysqli\QueryBuilder;
- use EasySwoole\ORM\DbManager;
- use EasySwoole\EasySwoole\Logger;
- /**
- * 任务说明:每10分钟抓取30分钟内更新的商品内容,
- * 使用福利api提供的接口推送到Es搜索引擎中
- */
- class ElasticSearchGoodsNewCrontab2 implements JobInterface
- {
- protected $time = 60 * 20; //xx内更新或新增的商品同步到es
- protected $ids = [];
- public function jobName(): string
- {
- // 定时任务的名称
- return 'ElasticSearchGoodsNewCrontab2';
- }
- public function crontabRule(): string
- {
- // 定义执行规则 根据 Crontab 来定义
- // 这里是每10分钟执行 1 次
- // return '*/10 * * * *';
- // return '* * * * *';
- return '*/3 * * * *';
- }
- public function run()
- {
- //定时任务的执行逻辑
- // $welfare_api_set = Config::getInstance()->getConf('WELFARE_API');
- $welfare_api_set = [
- // 'url' => 'https://open.api.superdesk.cn/api/base/dict_new/pushGoods',
- 'url' => 'http://open.api.superdesk.cn/api/base/dict_new/pushGoods',
- // 'url' => '',
- ];
- $page = 1;
- while (true) {
- if ($page > 20) {
- $this->ids = [];
- break;
- }
- $goods = $this->getUpdateGoods($page);
- if (!$goods) {
- $this->ids = [];
- break;
- } else {
- $page++;
- }
- foreach ($goods as $key => $value) {
- $res = $this->http_request(
- $welfare_api_set['url'],
- [
- 'es_index' => 'superdesk_welfare_20230511',
- 'es_type' => 'ims_superdesk_shop_goods',
- 'goodsid' => $value['id'],
- 'goods_body' => json_encode($value),
- ]
- );
- //返回内容写入日志
- Logger::getInstance()->notice("(new2)商品ID{$value['id']}导入结果:{$res}\n");
- }
- }
- }
- protected function getUpdateGoods($page = 1)
- {
- $size = 50;
- $offset = ($page - 1) * $size;
- if ($this->ids) {
- $ids = $this->ids;
- } else {
- $ids = array_unique(array_merge($this->getGoodsIdFromPool(), $this->getGoodsIdFromOperators_goods()));
- $this->ids = $ids;
- }
- // $ids = array_unique(array_merge($this->getGoodsIdFromPool(), $this->getGoodsIdFromOperators_goods()));
- if (!$ids) {
- return [];
- }
- $ids = implode(',', $ids);
- $where = " id IN ($ids)";
- $que_sql = "SELECT * FROM ims_superdesk_shop_goods WHERE {$where} LIMIT {$offset},{$size}; ";
- $queryBuild = new QueryBuilder();
- $queryBuild->raw($que_sql);
- $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
- $goods = $res_data['result'];
- $new_goods = [];
- foreach ($goods as $k => $item) {
- $item['operators_id'] = $this->getOperatorsIdByGoodsId($item['id']);
- $item['category'] = $this->getOperatorsCategory($item['id'], $item['ccate'], $item['operators_id']);
- $new_goods[$k] = $item;
- }
- return $new_goods;
- }
- function getOperatorsCategory($goods_id, $ccate, $operators_ids)
- {
- $result = array();
- $queryBuild = new QueryBuilder();
- $category_type = 0;
- foreach ($operators_ids as $operators_id) {
- $sql = "select category_type from ims_superdesk_shop_operators_user where id = '{$operators_id}'";
- $queryBuild->raw($sql);
- $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
- $res = $res_data['result'];
- if ($res) {
- $category_type = $res[0]['category_type'];
- }
- if ($category_type) { //自建分类
- $sql = "select ccate from ims_superdesk_shop_operators_goods where goodsid = '{$goods_id}' and operators_id = '{$operators_id}' ";
- $queryBuild->raw($sql);
- $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
- $res = $res_data['result'];
- if ($res) {
- $ccate = $res[0]['ccate'];
- }
- }
- $result[] = $operators_id.'_'.$category_type.'_'.$ccate;
- }
- return $result;
- }
- function getOperatorsIdByGoodsId($goods_id)
- {
- $sql = "select operators_id from ims_superdesk_shop_operators_goods where goodsid = '{$goods_id}' and `status`=1 AND deleted=0 AND checked=0 ";
- $queryBuild = new QueryBuilder();
- $queryBuild->raw($sql);
- $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
- $result = $res_data['result'];
- if (!$result) {
- return [];
- }
- return array_column($result, 'operators_id');
- }
- /**
- * 获取变更的商品ID来自商品主表
- */
- protected function getGoodsIdFromPool()
- {
- $time = time() - $this->time;
- $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
- $que_sql = "SELECT id FROM ims_superdesk_shop_goods WHERE {$where} ORDER BY updatetime DESC LIMIT 1000 ";
- $queryBuild = new QueryBuilder();
- $queryBuild->raw($que_sql);
- $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
- return array_column($res_data['result'], 'id');
- }
- /**
- * 获取变更的商品ID来自运营商商品表
- */
- protected function getGoodsIdFromOperators_goods()
- {
- $time = time() - $this->time;
- $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
- $que_sql = "SELECT goodsid FROM ims_superdesk_shop_operators_goods WHERE {$where} ORDER BY updatetime DESC LIMIT 1000 ";
- $queryBuild = new QueryBuilder();
- $queryBuild->raw($que_sql);
- $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
- return array_column($res_data['result'], 'goodsid');
- }
- /**
- * http请求
- * @param $url 请求地址
- * @param $data 数组内容
- * @return String 请求返回原生数据
- */
- protected function http_request($url, $data)
- {
- $opts = array(
- 'http' => array(
- 'method' => 'POST',
- 'header' => "Content-type:application/x-www-form-urlencoded",
- 'content' => http_build_query($data),
- )
- );
- $context = stream_context_create($opts);
- $response = file_get_contents($url, false, $context);
- return $response;
- }
- public function onException(\Throwable $throwable)
- {
- // 捕获 run 方法内所抛出的异常
- }
- }
|