Queue.class.php 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. <?php
  2. /**
  3. * 基本redis的消息队列
  4. * 用法:
  5. * use Com\Queue;
  6. * $queue = Queue::getInstance('msg');
  7. * 加入队列
  8. * $queue->push('aaaaaa');
  9. * $queue->push('bbbbb');
  10. * 获取队列长度
  11. * $queue->len();
  12. * 读取队列
  13. * $value = $queue->pop()
  14. * 删除队列
  15. * $queue->flushQueue();
  16. */
  17. namespace Com;
  18. class Queue extends \Think\Cache\Driver\Redis
  19. {
  20. static public $timeout = 1;
  21. static public $queueName = 'queue';
  22. /**
  23. * 操作句柄
  24. * @var string
  25. * @access protected
  26. */
  27. protected $handler;
  28. /**
  29. * 缓存连接参数
  30. * @var integer
  31. * @access protected
  32. */
  33. protected $options = array();
  34. /**
  35. * 取得缓存类实例
  36. * @static
  37. * @access public
  38. * @return mixed
  39. */
  40. public static function getInstance($queueName, $options = [])
  41. {
  42. if (C('DATA_CACHE_TYPE') != 'Redis') exit('DATA_CACHE_TYPE DO NOT Support Redis');
  43. //当前队列名称
  44. self::$queueName = $queueName;
  45. static $_instance = array();
  46. if (!isset($_instance[$queueName])) {
  47. $_instance[$queueName] = new Queue();
  48. }
  49. return $_instance[$queueName];
  50. }
  51. //设置队列名称
  52. public static function setQueueName($name)
  53. {
  54. self::$queueName = $name;
  55. }
  56. /**
  57. * 添加队列(lpush)
  58. * @param string $value
  59. * @return int 队列长度
  60. */
  61. public function push($value)
  62. {
  63. return $this->lPush(self::$queueName, $value);
  64. }
  65. //brpop
  66. /**
  67. * 读取队列
  68. * @return string|nil
  69. */
  70. public function pop()
  71. {
  72. $result = $this->brPop(self::$queueName, self::$timeout);
  73. return empty($result) ? $result : $result[1];
  74. }
  75. /**
  76. * 删除一个消息队列
  77. */
  78. public function flushQueue()
  79. {
  80. $this->delete(self::$queueName);
  81. }
  82. /**
  83. * 返回队列长茺
  84. * @return int
  85. */
  86. public function len()
  87. {
  88. return $this->LLEN(self::$queueName);
  89. }
  90. }
  91. ?>