5b87d9432ad77e8e5f23e5f1c4abc523cf7341e0
[tine20] / tine20 / Tinebase / ActionQueue / Worker.php
1 <?php
2 /**
3  * Tine 2.0
4  * @package     Tinebase
5  * @subpackage  ActionQueue
6  * @license     http://www.gnu.org/licenses/agpl.html AGPL Version 3
7  * @author      Züleyha Toptas <z.toptas@hotmail.de>
8  * @copyright   Copyright (c) 2012-2016 Metaways Infosystems GmbH (http://www.metaways.de)
9  */
10
11
12 /**
13  * Daemon to check the job queue and process jobs in separate processes
14  * 
15  * @package     Tinebase
16  * @subpackage  ActionQueue
17  */
18 class Tinebase_ActionQueue_Worker extends Console_Daemon
19 {
20     const EXECUTION_METHOD_DISPATCH = 'dispatch';
21     const EXECUTION_METHOD_EXEC_CLI = 'exec_cli';
22
23     protected $_stopped = false;
24     
25     /** 
26      * default configurations of this daemon
27      * 
28      * @var array
29      */
30     protected static $_defaultConfig = array(
31         'general' => array(
32             'configfile' => '/etc/tine20/actionQueue.ini', 
33             'pidfile'    => '/var/run/tine20/actionQueue.pid',
34         ),
35         'tine20' => array (
36             'executionMethod' => self::EXECUTION_METHOD_DISPATCH,
37             'maxRetry'        => 10,
38             'maxChildren'     => 10,
39             'shutDownWait'    => 60,
40         )
41     );
42     
43     /**
44      * keeps mapping from process id to job id
45      * 
46      * @var array
47      */
48     protected $_jobScoreBoard = array();
49     
50     /**
51      * infinite loop where daemon manages the execution of the jobs from the job queue
52      */
53     public function run()
54     {
55         if ('Tinebase_ActionQueue_Backend_Direct' === Tinebase_ActionQueue::getInstance()->getBackendType()) {
56             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . ' Tinebase_ActionQueue_Backend_Direct used. There is nothing to do for the worker! Configure Redis backend for example if you want to make use of the worker.');
57             exit(1);
58         }
59
60         $maxChildren = $this->_getConfig()->tine20->maxChildren;
61
62         while (!$this->_stopped) {
63
64             // manage the number of children
65             if (count ($this->_children) >=  $maxChildren) {
66
67                 // TODO this will log A LOT, add a timeout between logs...
68
69                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " reached max children limit: " . $this->_getConfig()->tine20->maxChildren);
70                 $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " number of pending jobs:" . Tinebase_ActionQueue::getInstance()->getQueueSize());
71                 usleep(1000); // save some trees
72                 pcntl_signal_dispatch();
73                 continue;
74             }
75             
76             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " trying to fetch a job from queue " . microtime(true));
77
78             $jobId = Tinebase_ActionQueue::getInstance()->waitForJob();
79
80             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " signal dispatch " . microtime(true));
81
82             pcntl_signal_dispatch();
83
84             // no job found
85             if ($jobId === FALSE || $this->_stopped) {
86                 continue;
87             }
88             
89             try {
90                 $job = Tinebase_ActionQueue::getInstance()->receive($jobId);
91             } catch (RuntimeException $re) {
92                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . " failed to receive message: " . $re->getMessage());
93                 
94                 // we are unable to process the job
95                 // probably the retry count is exceeded
96                 // TODO push message to dead letter queue
97                 Tinebase_ActionQueue::getInstance()->delete($jobId);
98                 
99                 continue;
100             }
101             
102             $this->_getLogger()->info (__METHOD__ . '::' . __LINE__ . " forking to process job {$job['action']} with id $jobId");
103             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " process message: " . print_r($job, TRUE)); 
104
105
106             // TODO fork may not work!!!
107             $childPid = $this->_forkChild();
108             
109             if ($childPid == 0) { // executed in child process
110                 try {
111                     $this->_executeAction($job);
112
113                     $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " exiting...");
114                     exit(0); // message will be deleted in parent process
115                     
116                 } catch (Exception $e) {
117                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $job['action']);
118                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getMessage());
119                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getTraceAsString());
120
121                     exit(1); // message will be rescheduled in parent process
122                 }
123                 
124             } else { // executed in parent process
125                 $this->_jobScoreBoard[$childPid] = $jobId;
126             }
127         }
128
129         $this->_shutDown();
130     }
131
132     protected function _shutDown()
133     {
134         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " shutting down... " . microtime(true));
135
136         $timeStart = time();
137         $timeElapsed = 0;
138         $shutDownWait = (int)($this->_getConfig()->tine20->shutDownWait);
139
140         while ($timeElapsed < $shutDownWait) {
141
142             pcntl_signal_dispatch();
143
144             if (count($this->_children) === 0) {
145                 break;
146             }
147
148             // 10ms
149             usleep(10000);
150
151             $timeElapsed = time() - $timeStart;
152         }
153
154         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " parent shut down... " . microtime(true));
155
156         parent::_shutDown();
157     }
158
159     protected function _gracefulShutDown()
160     {
161         $this->_stopped = true;
162
163         return true;
164     }
165
166     /**
167      * We have to destroy the Tinebase_ActionQueue instance before the process forks.
168      * Otherwise the resource holding the connection to the queue backend will be
169      * shared between the parent and the child which leads to strange problems
170      * 
171      * @see Console_Daemon::_beforeFork()
172      */
173     protected function _beforeFork()
174     {
175         Tinebase_ActionQueue::destroyInstance();
176     }
177     
178     /**
179      * handle terminated processes
180      * either delete or reschedule the job
181      * 
182      * @param  string  $pid     the pid of the process
183      * @param  string  $status  the exit status of the process 
184      * @return void
185      */
186     protected function _childTerminated($pid, $status)
187     {
188         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " with pid $pid");
189
190         $jobId = $this->_jobScoreBoard[$pid];
191         unset($this->_jobScoreBoard[$pid]);
192
193         if (true !== pcntl_wifexited($status)) {
194             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " child $pid did not finish successfully!");
195
196             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
197
198             return;
199         }
200         parent::_childTerminated($pid, $status);
201         
202         $status = pcntl_wexitstatus($status);
203         
204
205         if ($status > 0) { // failure
206             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid did not finish successfully. Will be rescheduled!");
207             
208             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
209             
210         } else {           // success
211             $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid finished successfully");
212             
213             Tinebase_ActionQueue::getInstance()->delete($jobId);
214         }
215     }
216     
217     /**
218      * execute the action
219      *
220      * @param  string  $job
221      * //@ throws Exception
222      * @todo make self::EXECUTION_METHOD_EXEC_CLI working
223      */
224     protected function _executeAction($job)
225     {
226         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " with isChild: " . var_export($this->_isChild, true));
227
228         // execute in subprocess
229         /*if ($this->_getConfig()->tine20->executionMethod === self::EXECUTION_METHOD_EXEC_CLI) {
230             $output = system('php $paths ./../../tine20.php --method Tinebase.executeQueueJob message=' . escapeshellarg($job), $exitCode );
231             if ($exitCode != 0) {
232                 throw new Exception('Problem during execution with shell: ' . $output);
233             }
234
235         // execute in same process
236         } else { */
237             //Zend_Registry::_unsetInstance();
238
239             Tinebase_Core::initFramework();
240     
241             Tinebase_Core::set(Tinebase_Core::USER, Tinebase_User::getInstance()->getFullUserById($job['account_id']));
242
243         if (true !== ($result = Tinebase_ActionQueue::getInstance()->executeAction($job))) {
244             throw new Tinebase_Exception_UnexpectedValue('action queue job execution did not return true: ' . var_export($result, true));
245         }
246         //}
247
248         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " result: " . var_export($result, true));
249     }
250 }