CronController.class.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. <?php
  2. namespace Common\Controller;
  3. use Think\Controller;
  4. class CronController extends Controller {
  5. public function __construct(){
  6. parent::__construct();
  7. }
  8. //推送消息
  9. public function pushMsg($content, $userId=''){
  10. $httpCurl = new \Org\Net\HttpCurl();
  11. $pushApiUrl = 'http://127.0.0.1:2121/';
  12. if (is_array($userId)){
  13. foreach ($userId as $uid){
  14. $data = array(
  15. 'type' => 'publish',
  16. 'content' => $content,
  17. //'content' => '{"code": 0,"msg": "已应战,请准备开始对战!","data": {"record_id": "11","user_id": "1000005","ref_user_id": "1000010"}}',
  18. 'to' => $uid,
  19. );
  20. $res = $httpCurl->post($pushApiUrl, $data, '');
  21. if ($res=='offline'){
  22. //return $uid;
  23. return $this->retryPushMsg($content, $uid);
  24. }
  25. }
  26. return FALSE;
  27. }else if(!empty($userId)){
  28. $data = array(
  29. 'type' => 'publish',
  30. 'content' => $content,
  31. 'to' => $userId,
  32. );
  33. $res = $httpCurl->post($pushApiUrl, $data, '');
  34. if ($res=='offline'){
  35. //return $userId;
  36. return $this->retryPushMsg($content, $userId);
  37. }
  38. return FALSE;
  39. }else{
  40. $data = array(
  41. 'type' => 'publish',
  42. 'content' => $content,
  43. 'to' => '',
  44. );
  45. $res = $httpCurl->post($pushApiUrl, $data, '');
  46. if ($res=='offline'){
  47. //return $uid;
  48. return $this->retryPushMsg($content, $userId);
  49. }
  50. return FALSE;
  51. }
  52. }
  53. //重试socket推送消息
  54. private function retryPushMsg($content, $to_user, $retry=2){
  55. sleep(2);
  56. $pushApiUrl = 'http://127.0.0.1:2121/';
  57. $httpCurl = new \Org\Net\HttpCurl();
  58. $data = array(
  59. 'type' => 'publish',
  60. 'content' => $content,
  61. 'to' => $to_user,
  62. );
  63. $res = $httpCurl->post($pushApiUrl, $data, '');
  64. if ($res=='offline' && $retry==2){
  65. $this->retryPushMsg($content, $to_user, 3);
  66. }
  67. if ($res=='offline'){
  68. return $to_user;
  69. }
  70. return FALSE;
  71. }
  72. //获取在线用户
  73. public function online(){
  74. $pushApiUrl = 'http://127.0.0.1:2121/';
  75. $httpCurl = new \Org\Net\HttpCurl();
  76. $data = array('type' => 'online',);
  77. $res = $httpCurl->post($pushApiUrl, $data, '');
  78. return json_decode($res,TRUE);
  79. }
  80. /**
  81. * 添加队列(lpush)
  82. * @param string $value
  83. * @return int 队列长度
  84. */
  85. public function pushQueue($key, $value){
  86. //Vendor('Sms.SmsSingleSender');
  87. //$sender=new \SmsSingleSender($smsConf['appId'], $smsConf['appKey']);
  88. $queue = new \Com\Queue();
  89. //$queueName = 'pk_words_'.$recordId;
  90. $queue->setQueueName($key);
  91. $queueVal = serialize($value);
  92. return $queue->push($queueVal);
  93. }
  94. /**
  95. * 读取队列(brpop)
  96. * @return string|nil
  97. */
  98. public function popQueue($key){
  99. $queue = new \Com\Queue();
  100. //$queueName = 'pk_words_'.$recordId;
  101. $queue->setQueueName($key);
  102. $queueVal = $queue->pop();
  103. if (empty($queueVal)){
  104. return FALSE;
  105. }else{
  106. return unserialize($queueVal);
  107. }
  108. }
  109. public function flushQueue($key){
  110. $queue = new \Com\Queue();
  111. $queue->setQueueName($key);
  112. $queue->flushQueue();
  113. }
  114. /**
  115. * 连接redis
  116. * @return \Redis
  117. */
  118. public function getRedis(){
  119. if ( !extension_loaded('redis') ) {
  120. E(L('_NOT_SUPPORT_').':redis');
  121. }
  122. $options = array(
  123. 'host' => C('REDIS_HOST') ? : '127.0.0.1',
  124. 'port' => C('REDIS_PORT') ? : 6379,
  125. 'timeout' => C('DATA_CACHE_TIMEOUT') ? : false,
  126. 'expire'=>3600,
  127. 'persistent' => false,
  128. );
  129. $this->options = $options;
  130. $this->options['expire'] = isset($options['expire'])? $options['expire'] : C('DATA_CACHE_TIME');
  131. $this->options['prefix'] = isset($options['prefix'])? $options['prefix'] : C('DATA_CACHE_PREFIX');
  132. $this->options['length'] = isset($options['length'])? $options['length'] : 0;
  133. $func = $options['persistent'] ? 'pconnect' : 'connect';
  134. $this->redis = new \Redis;
  135. $options['timeout'] === false ?
  136. $this->redis->$func($options['host'], $options['port']) :
  137. $this->redis->$func($options['host'], $options['port'], $options['timeout']);
  138. //添加redis 密码
  139. if(C('REDIS_AUTH')){
  140. $this->redis->auth(C('REDIS_AUTH'));
  141. }
  142. return $this->redis;
  143. }
  144. }