|
@@ -0,0 +1,161 @@
|
|
|
+<?php
|
|
|
+
|
|
|
+namespace App\Crontab;
|
|
|
+
|
|
|
+use EasySwoole\Crontab\JobInterface;
|
|
|
+use App\Com\SmtpSend;
|
|
|
+use EasySwoole\EasySwoole\Config;
|
|
|
+use App\Models\WarningBody;
|
|
|
+use EasySwoole\Mysqli\QueryBuilder;
|
|
|
+use EasySwoole\ORM\DbManager;
|
|
|
+use EasySwoole\EasySwoole\Logger;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 任务说明:每10分钟抓取30分钟内更新的商品内容,
|
|
|
+ * 使用福利api提供的接口推送到Es搜索引擎中
|
|
|
+ */
|
|
|
+class ElasticSearchGoodsNewCrontab2 implements JobInterface
|
|
|
+{
|
|
|
+
|
|
|
+ protected $time = 60 * 30; //xx内更新或新增的商品同步到es
|
|
|
+
|
|
|
+ public function jobName(): string
|
|
|
+ {
|
|
|
+ // 定时任务的名称
|
|
|
+ return 'ElasticSearchGoodsNewCrontab2';
|
|
|
+ }
|
|
|
+
|
|
|
+ public function crontabRule(): string
|
|
|
+ {
|
|
|
+ // 定义执行规则 根据 Crontab 来定义
|
|
|
+ // 这里是每10分钟执行 1 次
|
|
|
+ return '*/10 * * * *';
|
|
|
+// return '* * * * *';
|
|
|
+ }
|
|
|
+
|
|
|
+ public function run()
|
|
|
+ {
|
|
|
+ //定时任务的执行逻辑
|
|
|
+// $welfare_api_set = Config::getInstance()->getConf('WELFARE_API');
|
|
|
+ $welfare_api_set = [
|
|
|
+ 'url' => 'https://open.api.superdesk.cn/api/base/dict_new/pushGoods',
|
|
|
+// 'url' => '',
|
|
|
+ ];
|
|
|
+ $page = 1;
|
|
|
+ while (true) {
|
|
|
+ $goods = $this->getUpdateGoods($page);
|
|
|
+ if (!$goods) {
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ $page++;
|
|
|
+ }
|
|
|
+ foreach ($goods as $key => $value) {
|
|
|
+ $res = $this->http_request(
|
|
|
+ $welfare_api_set['url'],
|
|
|
+ [
|
|
|
+ 'es_index' => 'superdesk_welfare_20230511',
|
|
|
+ 'es_type' => 'ims_superdesk_shop_goods',
|
|
|
+ 'goodsid' => $value['id'],
|
|
|
+ 'goods_body' => json_encode($value),
|
|
|
+ ]
|
|
|
+ );
|
|
|
+ //返回内容写入日志
|
|
|
+ Logger::getInstance()->notice("(new2)商品ID{$value['id']}导入结果:{$res}\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected function getUpdateGoods($page = 1)
|
|
|
+ {
|
|
|
+
|
|
|
+ $size = 50;
|
|
|
+ $offset = ($page - 1) * $size;
|
|
|
+
|
|
|
+ $ids = array_unique(array_merge($this->getGoodsIdFromPool(), $this->getGoodsIdFromOperators_goods()));
|
|
|
+ if (!$ids) {
|
|
|
+ return [];
|
|
|
+ }
|
|
|
+
|
|
|
+ $ids = implode(',', $ids);
|
|
|
+
|
|
|
+ $where = " id IN ($ids)";
|
|
|
+ $que_sql = "SELECT * FROM ims_superdesk_shop_goods WHERE {$where} LIMIT {$offset},{$size}; ";
|
|
|
+ $queryBuild = new QueryBuilder();
|
|
|
+ $queryBuild->raw($que_sql);
|
|
|
+ $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
|
|
|
+ $goods = $res_data['result'];
|
|
|
+ $new_goods = [];
|
|
|
+ foreach ($goods as $k => $item) {
|
|
|
+ $item['operators_id'] = $this->getOperatorsIdByGoodsId($item['id']);
|
|
|
+ $new_goods[$k] = $item;
|
|
|
+ }
|
|
|
+ return $new_goods;
|
|
|
+ }
|
|
|
+
|
|
|
+ function getOperatorsIdByGoodsId($goods_id)
|
|
|
+ {
|
|
|
+ $sql = "select operators_id from ims_superdesk_shop_operators_goods where goodsid = '{$goods_id}' and `status`=1 AND deleted=0 AND checked=0 ";
|
|
|
+ $queryBuild = new QueryBuilder();
|
|
|
+ $queryBuild->raw($sql);
|
|
|
+ $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
|
|
|
+ $result = $res_data['result'];
|
|
|
+ if (!$result) {
|
|
|
+ return [];
|
|
|
+ }
|
|
|
+ return array_column($result, 'operators_id');
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取变更的商品ID来自商品主表
|
|
|
+ */
|
|
|
+ protected function getGoodsIdFromPool()
|
|
|
+ {
|
|
|
+ $time = time() - $this->time;
|
|
|
+ $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
|
|
|
+ $que_sql = "SELECT id FROM ims_superdesk_shop_goods WHERE {$where} ORDER BY updatetime DESC LIMIT 1000 ";
|
|
|
+ $queryBuild = new QueryBuilder();
|
|
|
+ $queryBuild->raw($que_sql);
|
|
|
+ $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
|
|
|
+ return array_column($res_data['result'], 'id');
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取变更的商品ID来自运营商商品表
|
|
|
+ */
|
|
|
+ protected function getGoodsIdFromOperators_goods()
|
|
|
+ {
|
|
|
+ $time = time() - $this->time;
|
|
|
+ $where = " `updatetime`> '{$time}' OR createtime > '{$time}' ";
|
|
|
+ $que_sql = "SELECT goodsid FROM ims_superdesk_shop_operators_goods WHERE {$where} ORDER BY updatetime DESC LIMIT 1000 ";
|
|
|
+ $queryBuild = new QueryBuilder();
|
|
|
+ $queryBuild->raw($que_sql);
|
|
|
+ $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray();
|
|
|
+ return array_column($res_data['result'], 'goodsid');
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * http请求
|
|
|
+ * @param $url 请求地址
|
|
|
+ * @param $data 数组内容
|
|
|
+ * @return String 请求返回原生数据
|
|
|
+ */
|
|
|
+ protected function http_request($url, $data)
|
|
|
+ {
|
|
|
+ $opts = array(
|
|
|
+ 'http' => array(
|
|
|
+ 'method' => 'POST',
|
|
|
+ 'header' => "Content-type:application/x-www-form-urlencoded",
|
|
|
+ 'content' => http_build_query($data),
|
|
|
+ )
|
|
|
+ );
|
|
|
+ $context = stream_context_create($opts);
|
|
|
+ $response = file_get_contents($url, false, $context);
|
|
|
+ return $response;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public function onException(\Throwable $throwable)
|
|
|
+ {
|
|
|
+ // 捕获 run 方法内所抛出的异常
|
|
|
+ }
|
|
|
+}
|