Tinebase ActionQueue - create a new process to execute job in
[tine20] / tine20 / Tinebase / ActionQueue / Worker.php
1 <?php
2 /**
3  * Tine 2.0
4  * @package     Tinebase
5  * @subpackage  ActionQueue
6  * @license     http://www.gnu.org/licenses/agpl.html AGPL Version 3
7  * @author      Züleyha Toptas <z.toptas@hotmail.de>
8  * @copyright   Copyright (c) 2012-2016 Metaways Infosystems GmbH (http://www.metaways.de)
9  */
10
11
12 /**
13  * Daemon to check the job queue and process jobs in separate processes
14  * 
15  * @package     Tinebase
16  * @subpackage  ActionQueue
17  */
18 class Tinebase_ActionQueue_Worker extends Console_Daemon
19 {
20     const EXECUTION_METHOD_DISPATCH = 'dispatch';
21     const EXECUTION_METHOD_EXEC_CLI = 'exec_cli';
22
23     protected $_stopped = false;
24     
25     /** 
26      * default configurations of this daemon
27      * 
28      * @var array
29      */
30     protected static $_defaultConfig = array(
31         'general' => array(
32             'configfile' => '/etc/tine20/actionQueue.ini', 
33             'pidfile'    => '/var/run/tine20/actionQueue.pid',
34         ),
35         'tine20' => array (
36             'executionMethod' => self::EXECUTION_METHOD_DISPATCH,
37             'maxRetry'        => 10,
38             'maxChildren'     => 10,
39             'shutDownWait'    => 60,
40         )
41     );
42     
43     /**
44      * keeps mapping from process id to job id
45      * 
46      * @var array
47      */
48     protected $_jobScoreBoard = array();
49     
50     /**
51      * infinite loop where daemon manages the execution of the jobs from the job queue
52      */
53     public function run()
54     {
55         $actionQueue = Tinebase_ActionQueue::getInstance();
56         if ($actionQueue->getBackendType() !== 'Tinebase_ActionQueue_Backend_Redis') {
57             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . ' not Tinebase_ActionQueue_Backend_Redis used. There is nothing to do for the worker! Configure Redis backend if you want to make use of the worker.');
58             exit(1);
59         }
60
61         $maxChildren = $this->_getConfig()->tine20->maxChildren;
62         $lastMaxChildren = 0;
63
64         while (!$this->_stopped) {
65
66             // manage the number of children
67             if (count ($this->_children) >=  $maxChildren) {
68
69                 // log only every minute
70                 if (time() - $lastMaxChildren > 60) {
71                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . " reached max children limit: " . $maxChildren);
72                     $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ . " number of pending jobs:" . $actionQueue->getQueueSize());
73                     $lastMaxChildren = time();
74                 }
75                 usleep(1000); // save some trees
76                 pcntl_signal_dispatch();
77                 continue;
78             }
79             
80             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " trying to fetch a job from queue " . microtime(true));
81
82             $jobId = $actionQueue->waitForJob();
83
84             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " signal dispatch " . microtime(true));
85
86             pcntl_signal_dispatch();
87
88             // no job found
89             if ($jobId === FALSE || $this->_stopped) {
90                 continue;
91             }
92             
93             try {
94                 $job = $actionQueue->receive($jobId);
95             } catch (RuntimeException $re) {
96                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . " failed to receive message: " . $re->getMessage());
97                 
98                 // we are unable to process the job
99                 // probably the retry count is exceeded
100                 // TODO push message to dead letter queue
101                 $actionQueue->delete($jobId);
102                 
103                 continue;
104             }
105             
106             $this->_getLogger()->info (__METHOD__ . '::' . __LINE__ . " forking to process job {$job['action']} with id $jobId");
107             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " process message: " . print_r($job, TRUE)); 
108
109
110             // TODO fork may not work!!!
111             $childPid = $this->_forkChild();
112             
113             if ($childPid == 0) { // executed in child process
114                 try {
115                     $this->_executeAction($job);
116
117                     $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " exiting...");
118                     exit(0); // message will be deleted in parent process
119                     
120                 } catch (Exception $e) {
121                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $job['action']);
122                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getMessage());
123                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getTraceAsString());
124
125                     exit(1); // message will be rescheduled in parent process
126                 }
127                 
128             } else { // executed in parent process
129                 $this->_jobScoreBoard[$childPid] = $jobId;
130             }
131         }
132
133         $this->_shutDown();
134     }
135
136     protected function _shutDown()
137     {
138         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " shutting down... " . microtime(true));
139
140         $timeStart = time();
141         $timeElapsed = 0;
142         $shutDownWait = (int)($this->_getConfig()->tine20->shutDownWait);
143
144         while ($timeElapsed < $shutDownWait) {
145
146             pcntl_signal_dispatch();
147
148             if (count($this->_children) === 0) {
149                 break;
150             }
151
152             // 10ms
153             usleep(10000);
154
155             $timeElapsed = time() - $timeStart;
156         }
157
158         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " parent shut down... " . microtime(true));
159
160         parent::_shutDown();
161     }
162
163     protected function _gracefulShutDown()
164     {
165         $this->_stopped = true;
166
167         return true;
168     }
169
170     /**
171      * We have to destroy the Tinebase_ActionQueue instance before the process forks.
172      * Otherwise the resource holding the connection to the queue backend will be
173      * shared between the parent and the child which leads to strange problems
174      * 
175      * @see Console_Daemon::_beforeFork()
176      */
177     protected function _beforeFork()
178     {
179         Tinebase_ActionQueue::destroyInstance();
180     }
181     
182     /**
183      * handle terminated processes
184      * either delete or reschedule the job
185      * 
186      * @param  string  $pid     the pid of the process
187      * @param  string  $status  the exit status of the process 
188      * @return void
189      */
190     protected function _childTerminated($pid, $status)
191     {
192         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " with pid $pid");
193
194         $jobId = $this->_jobScoreBoard[$pid];
195         unset($this->_jobScoreBoard[$pid]);
196
197         if (true !== pcntl_wifexited($status)) {
198             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " child $pid did not finish successfully!");
199
200             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
201
202             return;
203         }
204         parent::_childTerminated($pid, $status);
205         
206         $status = pcntl_wexitstatus($status);
207         
208
209         if ($status > 0) { // failure
210             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid did not finish successfully. Will be rescheduled!");
211             
212             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
213             
214         } else {           // success
215             $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid finished successfully");
216             
217             Tinebase_ActionQueue::getInstance()->delete($jobId);
218         }
219     }
220     
221     /**
222      * execute the action
223      *
224      * @param  string  $job
225      * //@ throws Exception
226      * @todo make self::EXECUTION_METHOD_EXEC_CLI working
227      */
228     protected function _executeAction($job)
229     {
230         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " with isChild: " . var_export($this->_isChild, true));
231
232         // execute in subprocess
233         //if ($this->_getConfig()->tine20->executionMethod === self::EXECUTION_METHOD_EXEC_CLI) {
234         chdir(__DIR__);
235             exec('php -d include_path=' . escapeshellarg(get_include_path()) .
236                 ' ./../../tine20.php --method Tinebase.executeQueueJob message=' .
237                 escapeshellarg(json_encode($job)), $output, $exitCode);
238             if ($exitCode != 0) {
239                 throw new Exception('Problem during execution with shell: ' . join(PHP_EOL, $output));
240             }
241
242         // execute in same process
243         /*} else {
244             Zend_Registry::_unsetInstance();
245
246             Tinebase_Core::initFramework();
247     
248             Tinebase_Core::set(Tinebase_Core::USER, Tinebase_User::getInstance()->getFullUserById($job['account_id']));
249
250         if (true !== ($result = Tinebase_ActionQueue::getInstance()->executeAction($job))) {
251             throw new Tinebase_Exception_UnexpectedValue('action queue job execution did not return true: ' . var_export($result, true));
252         }*/
253         //}
254
255         //$this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " result: " . var_export($result, true));
256         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " output: " . var_export($output, true));
257     }
258 }