'publish', 'content' => $content, //'content' => '{"code": 0,"msg": "已应战,请准备开始对战!","data": {"record_id": "11","user_id": "1000005","ref_user_id": "1000010"}}', 'to' => $uid, ); $res = $httpCurl->post($pushApiUrl, $data, ''); if ($res=='offline'){ //return $uid; return $this->retryPushMsg($content, $uid); } } return FALSE; }else if(!empty($userId)){ $data = array( 'type' => 'publish', 'content' => $content, 'to' => $userId, ); $res = $httpCurl->post($pushApiUrl, $data, ''); if ($res=='offline'){ //return $userId; return $this->retryPushMsg($content, $userId); } return FALSE; }else{ $data = array( 'type' => 'publish', 'content' => $content, 'to' => '', ); $res = $httpCurl->post($pushApiUrl, $data, ''); if ($res=='offline'){ //return $uid; return $this->retryPushMsg($content, $userId); } return FALSE; } } //重试socket推送消息 private function retryPushMsg($content, $to_user, $retry=2){ sleep(2); $pushApiUrl = 'http://127.0.0.1:2121/'; $httpCurl = new \Org\Net\HttpCurl(); $data = array( 'type' => 'publish', 'content' => $content, 'to' => $to_user, ); $res = $httpCurl->post($pushApiUrl, $data, ''); if ($res=='offline' && $retry==2){ $this->retryPushMsg($content, $to_user, 3); } if ($res=='offline'){ return $to_user; } return FALSE; } //获取在线用户 public function online(){ $pushApiUrl = 'http://127.0.0.1:2121/'; $httpCurl = new \Org\Net\HttpCurl(); $data = array('type' => 'online',); $res = $httpCurl->post($pushApiUrl, $data, ''); return json_decode($res,TRUE); } /** * 添加队列(lpush) * @param string $value * @return int 队列长度 */ public function pushQueue($key, $value){ //Vendor('Sms.SmsSingleSender'); //$sender=new \SmsSingleSender($smsConf['appId'], $smsConf['appKey']); $queue = new \Com\Queue(); //$queueName = 'pk_words_'.$recordId; $queue->setQueueName($key); $queueVal = serialize($value); return $queue->push($queueVal); } /** * 读取队列(brpop) * @return string|nil */ public function popQueue($key){ $queue = new \Com\Queue(); //$queueName = 'pk_words_'.$recordId; $queue->setQueueName($key); $queueVal = $queue->pop(); if (empty($queueVal)){ return FALSE; }else{ return unserialize($queueVal); } } public function flushQueue($key){ $queue = new \Com\Queue(); $queue->setQueueName($key); $queue->flushQueue(); } /** * 连接redis * @return \Redis */ public function getRedis(){ if ( !extension_loaded('redis') ) { E(L('_NOT_SUPPORT_').':redis'); } $options = array( 'host' => C('REDIS_HOST') ? : '127.0.0.1', 'port' => C('REDIS_PORT') ? : 6379, 'timeout' => C('DATA_CACHE_TIMEOUT') ? : false, 'expire'=>3600, 'persistent' => false, ); $this->options = $options; $this->options['expire'] = isset($options['expire'])? $options['expire'] : C('DATA_CACHE_TIME'); $this->options['prefix'] = isset($options['prefix'])? $options['prefix'] : C('DATA_CACHE_PREFIX'); $this->options['length'] = isset($options['length'])? $options['length'] : 0; $func = $options['persistent'] ? 'pconnect' : 'connect'; $this->redis = new \Redis; $options['timeout'] === false ? $this->redis->$func($options['host'], $options['port']) : $this->redis->$func($options['host'], $options['port'], $options['timeout']); //添加redis 密码 if(C('REDIS_AUTH')){ $this->redis->auth(C('REDIS_AUTH')); } return $this->redis; } }