set = BalancePushSet::create()->get()->toArray(); //是否启用 if($this->set['enable']!=1){ return; } //创建任务 $taskId = $this->createTask(); //生成任务内容以及处理消息发送任务 $this->push_start($taskId); //处理需要重新发送消息的内容 $this->process_repeat_msg(); } public function onException(\Throwable $throwable) { // 捕获 run 方法内所抛出的异常 } /** * 推送开始 */ protected function push_start($task_id){ $operators = explode(",",$this->set['operators']); foreach($operators as $value){ //查询该运营商用户的余额,上月消费记录等数据 $startTime = strtotime(date('Y-m-01', strtotime('-1 month'))); $endTime = strtotime(date('Y-m-t', strtotime('-1 month')))+86400; //查询条件 $where = empty($value)?' m.`operators_id`=0 or m.`operators_id` IS NULL or m.`operators_id`=""':' m.`operators_id` = '.$value; $que_sql = "SELECT m.`id`,m.`openid_wx`,m.`mobile`,m.`credit2`,m.`operators_id`,IFNULL(mo.`consume`,0) AS consume,IF(LENGTH(ou.`key`)<1 OR ou.`key` IS NULL,'wxc80da22c408c9a85',ou.`key`) AS appid, IF(LENGTH(ou.`key`)<1 OR ou.`key` IS NULL,'前台优选',ou.`operators_name`) AS operators_name FROM ims_superdesk_shop_member m LEFT JOIN (SELECT SUM(credit_use) AS consume,openid FROM (SELECT o.`id`,( CASE o.`paytype` WHEN 1 THEN o.`price` WHEN 24 THEN o.`comb_pay_credit` ELSE 0 END) AS credit_use, o.`openid` FROM ims_superdesk_shop_order o WHERE o.`parentid`=0 AND o.`status` IN (1,2,3) AND o.`createtime`>={$startTime} AND o.`createtime`<{$endTime} ) AS temp_order GROUP BY openid) AS mo ON mo.openid = m.`openid` LEFT JOIN ims_superdesk_shop_operators_user ou ON ou.`id` = m.`operators_id` WHERE {$where}; "; $queryBuild = new QueryBuilder(); $queryBuild->raw($que_sql); $res_data = DbManager::getInstance()->query($queryBuild, true, 'default')->toArray(); foreach($res_data['result'] as $k=>$val){ //插入推送内容 $insert = [ 'task_id' => $task_id, 'member_id' => $val['id'], //设置启用公众号消息以及该用户存在公众号openid的情况下推送公众号消息,否则推送短信 'push_type' => !empty($val['openid_wx'])&&$this->set['wechat_msg']==1?1:2, 'openid' => $val['openid_wx'], 'mobile' => $val['mobile'], 'consume_credit' => $val['consume'], 'balance' => $val['credit2'], 'end_time' => $endTime-1, 'operators_id' => $val['operators_id'], 'operators_name' => $val['operators_name'], 'status' => 0, ]; //公众号消息模板推送屏蔽 if($insert['push_type']==1 && !empty($this->set['wechat_msg_id'])){ $wechat_msg_id = explode(",",$this->set['wechat_msg_id']); if (in_array($insert['member_id'],$wechat_msg_id)) { continue; } } //手机短信推送屏蔽 if($insert['push_type']==2 && !empty($this->set['sms_id'])){ $sms_id = explode(",",$this->set['sms_id']); if (in_array($insert['member_id'],$sms_id)) { continue; } } $model = BalancePushBody::create($insert); $body_id = $model->save(); //开始推送 $this->push_msg($insert,$body_id); } } } /** * 开始推送内容 * @param $sendData 任务内容,一般是推送任务内容表的数据 * @param $body_id 任务内容id * @return Bool 是否发送成功 true是 false否 */ protected function push_msg($sendData,$body_id){ //公众号消息模板推送 if($sendData['push_type']==1){ $wm = new WechatMessage($sendData['operators_id']); $api_res = $wm->send_message($sendData['openid'],$this->templateId,'',[ //模板数据 'first' => ['value'=>'尊敬的客户:您上月共消费了'.$sendData['consume_credit'].'积分'], 'keyword1' => ['value'=>$sendData['balance']], 'keyword2' => ['value'=>date('Y年m月d日',$sendData['end_time'])], 'remark' => ['value'=>'欢迎登录「'.$sendData['operators_name'].'」小程序选购心仪的商品。'], ]); $status = $api_res['response']['errcode']!==0?2:1; //0无响应 1成功 2失败 } //手机短信推送 if($sendData['push_type']==2){ $sms = new Sms($sendData['operators_id']); $api_res = $sms->send($sendData['mobile'],'SMS_232170368',[ 's_1' => $sendData['consume_credit'], 's_2' => $sendData['balance'], ]); $status = $api_res['response']['Code']!='OK'?2:1; //0无响应 1成功 2失败 } //更新任务内容表 $res = BalancePushBody::create()->update([ 'status' => $status, ], ['id' => $body_id]); //写入日志表 BalancePushLog::create()->data([ 'body_id' => $body_id, 'request' => json_encode($api_res['request']), 'response' => json_encode($api_res['response']), 'create_time' => time(), 'status' => $status, ],false)->save(); } function process_repeat_msg(){ $repeat_list = BalancePushTask::create()->where('is_send',2)->all(); foreach($repeat_list as $value){ //获取需要重发任务下属的失败内容 $value = $value->toArray(); $repeat_body = BalancePushBody::create()->where('task_id',$value['id'])->where('status', 1, '!=')->all(); foreach($repeat_body as $val){ $val = $val->toArray(); $this->push_msg($val,$val['id']); //重新推送处理 } } } /** * 创建任务 * @return int 任务ID */ protected function createTask(){ $time = time(); $model = BalancePushTask::create([ 'operator' => '系统', 'create_time' => $time, 'update_time' => $time, 'is_send' => 1, ]); return $model->save(); } }