Job.php 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  8. // +----------------------------------------------------------------------
  9. // | Author: yunwuxin <448901948@qq.com>
  10. // +----------------------------------------------------------------------
  11. namespace think\queue;
  12. use Exception;
  13. use think\App;
  14. abstract class Job
  15. {
  16. /**
  17. * The job handler instance.
  18. * @var mixed
  19. */
  20. protected $instance;
  21. /**
  22. * @var App
  23. */
  24. protected $app;
  25. /**
  26. * The name of the queue the job belongs to.
  27. * @var string
  28. */
  29. protected $queue;
  30. /**
  31. * The name of the connection the job belongs to.
  32. */
  33. protected $connection;
  34. /**
  35. * Indicates if the job has been deleted.
  36. * @var bool
  37. */
  38. protected $deleted = false;
  39. /**
  40. * Indicates if the job has been released.
  41. * @var bool
  42. */
  43. protected $released = false;
  44. /**
  45. * Indicates if the job has failed.
  46. *
  47. * @var bool
  48. */
  49. protected $failed = false;
  50. /**
  51. * Get the decoded body of the job.
  52. *
  53. * @return array
  54. */
  55. public function payload()
  56. {
  57. return json_decode($this->getRawBody(), true);
  58. }
  59. /**
  60. * Fire the job.
  61. * @return void
  62. */
  63. public function fire()
  64. {
  65. $payload = $this->payload();
  66. list($class, $method) = $this->parseJob($payload['job']);
  67. $this->instance = $this->resolve($class);
  68. if ($this->instance) {
  69. $this->instance->{$method}($this, $payload['data']);
  70. }
  71. }
  72. /**
  73. * Delete the job from the queue.
  74. * @return void
  75. */
  76. public function delete()
  77. {
  78. $this->deleted = true;
  79. }
  80. /**
  81. * Determine if the job has been deleted.
  82. * @return bool
  83. */
  84. public function isDeleted()
  85. {
  86. return $this->deleted;
  87. }
  88. /**
  89. * Release the job back into the queue.
  90. * @param int $delay
  91. * @return void
  92. */
  93. public function release($delay = 0)
  94. {
  95. $this->released = true;
  96. }
  97. /**
  98. * Determine if the job was released back into the queue.
  99. * @return bool
  100. */
  101. public function isReleased()
  102. {
  103. return $this->released;
  104. }
  105. /**
  106. * Determine if the job has been deleted or released.
  107. * @return bool
  108. */
  109. public function isDeletedOrReleased()
  110. {
  111. return $this->isDeleted() || $this->isReleased();
  112. }
  113. /**
  114. * Get the job identifier.
  115. *
  116. * @return string
  117. */
  118. abstract public function getJobId();
  119. /**
  120. * Get the number of times the job has been attempted.
  121. * @return int
  122. */
  123. abstract public function attempts();
  124. /**
  125. * Get the raw body string for the job.
  126. * @return string
  127. */
  128. abstract public function getRawBody();
  129. /**
  130. * Parse the job declaration into class and method.
  131. * @param string $job
  132. * @return array
  133. */
  134. protected function parseJob($job)
  135. {
  136. $segments = explode('@', $job);
  137. return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
  138. }
  139. /**
  140. * Resolve the given job handler.
  141. * @param string $name
  142. * @return mixed
  143. */
  144. protected function resolve($name)
  145. {
  146. if (strpos($name, '\\') === false) {
  147. if (strpos($name, '/') === false) {
  148. $app = '';
  149. } else {
  150. list($app, $name) = explode('/', $name, 2);
  151. }
  152. $name = ($this->app->config->get('app.app_namespace') ?: 'app\\') . ($app ? strtolower($app) . '\\' : '') . 'job\\' . $name;
  153. }
  154. return $this->app->make($name);
  155. }
  156. /**
  157. * Determine if the job has been marked as a failure.
  158. *
  159. * @return bool
  160. */
  161. public function hasFailed()
  162. {
  163. return $this->failed;
  164. }
  165. /**
  166. * Mark the job as "failed".
  167. *
  168. * @return void
  169. */
  170. public function markAsFailed()
  171. {
  172. $this->failed = true;
  173. }
  174. /**
  175. * Process an exception that caused the job to fail.
  176. *
  177. * @param Exception $e
  178. * @return void
  179. */
  180. public function failed($e)
  181. {
  182. $this->markAsFailed();
  183. $payload = $this->payload();
  184. list($class, $method) = $this->parseJob($payload['job']);
  185. if (method_exists($this->instance = $this->resolve($class), 'failed')) {
  186. $this->instance->failed($payload['data'], $e);
  187. }
  188. }
  189. /**
  190. * Get the number of times to attempt a job.
  191. *
  192. * @return int|null
  193. */
  194. public function maxTries()
  195. {
  196. return $this->payload()['maxTries'] ?? null;
  197. }
  198. /**
  199. * Get the number of seconds the job can run.
  200. *
  201. * @return int|null
  202. */
  203. public function timeout()
  204. {
  205. return $this->payload()['timeout'] ?? null;
  206. }
  207. /**
  208. * Get the timestamp indicating when the job should timeout.
  209. *
  210. * @return int|null
  211. */
  212. public function timeoutAt()
  213. {
  214. return $this->payload()['timeoutAt'] ?? null;
  215. }
  216. /**
  217. * Get the name of the queued job class.
  218. *
  219. * @return string
  220. */
  221. public function getName()
  222. {
  223. return $this->payload()['job'];
  224. }
  225. /**
  226. * Get the name of the connection the job belongs to.
  227. *
  228. * @return string
  229. */
  230. public function getConnection()
  231. {
  232. return $this->connection;
  233. }
  234. /**
  235. * Get the name of the queue the job belongs to.
  236. * @return string
  237. */
  238. public function getQueue()
  239. {
  240. return $this->queue;
  241. }
  242. }