StreamSelectLoop.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Events\React;
  15. use Workerman\Events\EventInterface;
  16. /**
  17. * Class StreamSelectLoop
  18. * @package Workerman\Events\React
  19. */
  20. class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
  21. {
  22. /**
  23. * @var array
  24. */
  25. protected $_timerIdMap = array();
  26. /**
  27. * @var int
  28. */
  29. protected $_timerIdIndex = 0;
  30. /**
  31. * Add event listener to event loop.
  32. *
  33. * @param $fd
  34. * @param $flag
  35. * @param $func
  36. * @param array $args
  37. * @return bool
  38. */
  39. public function add($fd, $flag, $func, $args = array())
  40. {
  41. $args = (array)$args;
  42. switch ($flag) {
  43. case EventInterface::EV_READ:
  44. return $this->addReadStream($fd, $func);
  45. case EventInterface::EV_WRITE:
  46. return $this->addWriteStream($fd, $func);
  47. case EventInterface::EV_SIGNAL:
  48. return $this->addSignal($fd, $func);
  49. case EventInterface::EV_TIMER:
  50. $timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
  51. call_user_func_array($func, $args);
  52. });
  53. $this->_timerIdMap[++$this->_timerIdIndex] = $timer_obj;
  54. return $this->_timerIdIndex;
  55. case EventInterface::EV_TIMER_ONCE:
  56. $index = ++$this->_timerIdIndex;
  57. $timer_obj = $this->addTimer($fd, function() use ($func, $args, $index) {
  58. $this->del($index,EventInterface::EV_TIMER_ONCE);
  59. call_user_func_array($func, $args);
  60. });
  61. $this->_timerIdMap[$index] = $timer_obj;
  62. return $this->_timerIdIndex;
  63. }
  64. return false;
  65. }
  66. /**
  67. * Remove event listener from event loop.
  68. *
  69. * @param mixed $fd
  70. * @param int $flag
  71. * @return bool
  72. */
  73. public function del($fd, $flag)
  74. {
  75. switch ($flag) {
  76. case EventInterface::EV_READ:
  77. return $this->removeReadStream($fd);
  78. case EventInterface::EV_WRITE:
  79. return $this->removeWriteStream($fd);
  80. case EventInterface::EV_SIGNAL:
  81. return $this->removeSignal($fd);
  82. case EventInterface::EV_TIMER:
  83. case EventInterface::EV_TIMER_ONCE;
  84. if (isset($this->_timerIdMap[$fd])){
  85. $timer_obj = $this->_timerIdMap[$fd];
  86. unset($this->_timerIdMap[$fd]);
  87. $this->cancelTimer($timer_obj);
  88. return true;
  89. }
  90. }
  91. return false;
  92. }
  93. /**
  94. * Main loop.
  95. *
  96. * @return void
  97. */
  98. public function loop()
  99. {
  100. $this->run();
  101. }
  102. /**
  103. * Add signal handler.
  104. *
  105. * @param $signal
  106. * @param $callback
  107. * @return bool
  108. */
  109. public function addSignal($signal, $callback)
  110. {
  111. if(DIRECTORY_SEPARATOR === '/') {
  112. pcntl_signal($signal, $callback);
  113. }
  114. }
  115. /**
  116. * Remove signal handler.
  117. *
  118. * @param $signal
  119. */
  120. public function removeSignal($signal)
  121. {
  122. if(DIRECTORY_SEPARATOR === '/') {
  123. pcntl_signal($signal, SIG_IGN);
  124. }
  125. }
  126. /**
  127. * Emulate a stream_select() implementation that does not break when passed
  128. * empty stream arrays.
  129. *
  130. * @param array &$read An array of read streams to select upon.
  131. * @param array &$write An array of write streams to select upon.
  132. * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
  133. *
  134. * @return integer|false The total number of streams that are ready for read/write.
  135. * Can return false if stream_select() is interrupted by a signal.
  136. */
  137. protected function streamSelect(array &$read, array &$write, $timeout)
  138. {
  139. if ($read || $write) {
  140. $except = null;
  141. // Calls signal handlers for pending signals
  142. if(DIRECTORY_SEPARATOR === '/') {
  143. pcntl_signal_dispatch();
  144. }
  145. // suppress warnings that occur, when stream_select is interrupted by a signal
  146. return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
  147. }
  148. // Calls signal handlers for pending signals
  149. if(DIRECTORY_SEPARATOR === '/') {
  150. pcntl_signal_dispatch();
  151. }
  152. $timeout && usleep($timeout);
  153. return 0;
  154. }
  155. /**
  156. * Destroy loop.
  157. *
  158. * @return void
  159. */
  160. public function destroy()
  161. {
  162. }
  163. /**
  164. * Get timer count.
  165. *
  166. * @return integer
  167. */
  168. public function getTimerCount()
  169. {
  170. return count($this->_timerIdMap);
  171. }
  172. }