Tinebase_ActionQueue_Worker: do not init tine20 framework in parent
[tine20] / tine20 / Tinebase / ActionQueue / Worker.php
1 <?php
2 /**
3  * Tine 2.0
4  * @package     Tinebase
5  * @subpackage  ActionQueue
6  * @license     http://www.gnu.org/licenses/agpl.html AGPL Version 3
7  * @author      Züleyha Toptas <z.toptas@hotmail.de>
8  * @copyright   Copyright (c) 2012-2016 Metaways Infosystems GmbH (http://www.metaways.de)
9  */
10
11
12 /**
13  * Daemon to check the job queue and process jobs in separate processes
14  * 
15  * @package     Tinebase
16  * @subpackage  ActionQueue
17  */
18 class Tinebase_ActionQueue_Worker extends Console_Daemon
19 {
20     const EXECUTION_METHOD_DISPATCH = 'dispatch';
21     const EXECUTION_METHOD_EXEC_CLI = 'exec_cli';
22
23     protected $_stopped = false;
24     
25     /** 
26      * default configurations of this daemon
27      * 
28      * @var array
29      */
30     protected static $_defaultConfig = array(
31         'general' => array(
32             'configfile' => '/etc/tine20/actionQueue.ini', 
33             'pidfile'    => '/var/run/tine20/actionQueue.pid',
34         ),
35         'tine20' => array (
36             'executionMethod' => self::EXECUTION_METHOD_DISPATCH,
37             'maxRetry'        => 10,
38             'maxChildren'     => 10,
39             'shutDownWait'    => 60,
40         )
41     );
42     
43     /**
44      * keeps mapping from process id to job id
45      * 
46      * @var array
47      */
48     protected $_jobScoreBoard = array();
49     
50     /**
51      * infinite loop where daemon manages the execution of the jobs from the job queue
52      */
53     public function run()
54     {
55         $actionQueueConfig = Tinebase_Core::getConfig()->getConfigFileSection(Tinebase_Config::ACTIONQUEUE);
56         if (!is_array($actionQueueConfig) || !isset($actionQueueConfig[Tinebase_Config::ACTIONQUEUE]) ||
57                 !is_array($actionQueueConfig[Tinebase_Config::ACTIONQUEUE]) ||
58                 !isset($actionQueueConfig[Tinebase_Config::ACTIONQUEUE][Tinebase_Config::ACTIONQUEUE_BACKEND]) ||
59                 'redis' !== strtolower(
60                     $actionQueueConfig[Tinebase_Config::ACTIONQUEUE][Tinebase_Config::ACTIONQUEUE_BACKEND])) {
61             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . ' Tinebase_ActionQueue_Backend_Direct used. There is nothing to do for the worker! Configure Redis backend for example if you want to make use of the worker.');
62             exit(1);
63         }
64
65         $maxChildren = $this->_getConfig()->tine20->maxChildren;
66
67         while (!$this->_stopped) {
68
69             // manage the number of children
70             if (count ($this->_children) >=  $maxChildren) {
71
72                 // TODO this will log A LOT, add a timeout between logs...
73
74                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " reached max children limit: " . $this->_getConfig()->tine20->maxChildren);
75                 $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " number of pending jobs:" . Tinebase_ActionQueue::getInstance()->getQueueSize());
76                 usleep(1000); // save some trees
77                 pcntl_signal_dispatch();
78                 continue;
79             }
80             
81             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " trying to fetch a job from queue " . microtime(true));
82
83             $jobId = Tinebase_ActionQueue::getInstance()->waitForJob();
84
85             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " signal dispatch " . microtime(true));
86
87             pcntl_signal_dispatch();
88
89             // no job found
90             if ($jobId === FALSE || $this->_stopped) {
91                 continue;
92             }
93             
94             try {
95                 $job = Tinebase_ActionQueue::getInstance()->receive($jobId);
96             } catch (RuntimeException $re) {
97                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . " failed to receive message: " . $re->getMessage());
98                 
99                 // we are unable to process the job
100                 // probably the retry count is exceeded
101                 // TODO push message to dead letter queue
102                 Tinebase_ActionQueue::getInstance()->delete($jobId);
103                 
104                 continue;
105             }
106             
107             $this->_getLogger()->info (__METHOD__ . '::' . __LINE__ . " forking to process job {$job['action']} with id $jobId");
108             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " process message: " . print_r($job, TRUE)); 
109
110
111             // TODO fork may not work!!!
112             $childPid = $this->_forkChild();
113             
114             if ($childPid == 0) { // executed in child process
115                 try {
116                     $this->_executeAction($job);
117
118                     $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " exiting...");
119                     exit(0); // message will be deleted in parent process
120                     
121                 } catch (Exception $e) {
122                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $job['action']);
123                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getMessage());
124                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getTraceAsString());
125
126                     exit(1); // message will be rescheduled in parent process
127                 }
128                 
129             } else { // executed in parent process
130                 $this->_jobScoreBoard[$childPid] = $jobId;
131             }
132         }
133
134         $this->_shutDown();
135     }
136
137     protected function _shutDown()
138     {
139         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " shutting down... " . microtime(true));
140
141         $timeStart = time();
142         $timeElapsed = 0;
143         $shutDownWait = (int)($this->_getConfig()->tine20->shutDownWait);
144
145         while ($timeElapsed < $shutDownWait) {
146
147             pcntl_signal_dispatch();
148
149             if (count($this->_children) === 0) {
150                 break;
151             }
152
153             // 10ms
154             usleep(10000);
155
156             $timeElapsed = time() - $timeStart;
157         }
158
159         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " parent shut down... " . microtime(true));
160
161         parent::_shutDown();
162     }
163
164     protected function _gracefulShutDown()
165     {
166         $this->_stopped = true;
167
168         return true;
169     }
170
171     /**
172      * We have to destroy the Tinebase_ActionQueue instance before the process forks.
173      * Otherwise the resource holding the connection to the queue backend will be
174      * shared between the parent and the child which leads to strange problems
175      * 
176      * @see Console_Daemon::_beforeFork()
177      */
178     protected function _beforeFork()
179     {
180         Tinebase_ActionQueue::destroyInstance();
181     }
182     
183     /**
184      * handle terminated processes
185      * either delete or reschedule the job
186      * 
187      * @param  string  $pid     the pid of the process
188      * @param  string  $status  the exit status of the process 
189      * @return void
190      */
191     protected function _childTerminated($pid, $status)
192     {
193         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " with pid $pid");
194
195         $jobId = $this->_jobScoreBoard[$pid];
196         unset($this->_jobScoreBoard[$pid]);
197
198         if (true !== pcntl_wifexited($status)) {
199             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " child $pid did not finish successfully!");
200
201             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
202
203             return;
204         }
205         parent::_childTerminated($pid, $status);
206         
207         $status = pcntl_wexitstatus($status);
208         
209
210         if ($status > 0) { // failure
211             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid did not finish successfully. Will be rescheduled!");
212             
213             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
214             
215         } else {           // success
216             $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid finished successfully");
217             
218             Tinebase_ActionQueue::getInstance()->delete($jobId);
219         }
220     }
221     
222     /**
223      * execute the action
224      *
225      * @param  string  $job
226      * //@ throws Exception
227      * @todo make self::EXECUTION_METHOD_EXEC_CLI working
228      */
229     protected function _executeAction($job)
230     {
231         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " with isChild: " . var_export($this->_isChild, true));
232
233         // execute in subprocess
234         /*if ($this->_getConfig()->tine20->executionMethod === self::EXECUTION_METHOD_EXEC_CLI) {
235             $output = system('php $paths ./../../tine20.php --method Tinebase.executeQueueJob message=' . escapeshellarg($job), $exitCode );
236             if ($exitCode != 0) {
237                 throw new Exception('Problem during execution with shell: ' . $output);
238             }
239
240         // execute in same process
241         } else { */
242             //Zend_Registry::_unsetInstance();
243
244             Tinebase_Core::initFramework();
245     
246             Tinebase_Core::set(Tinebase_Core::USER, Tinebase_User::getInstance()->getFullUserById($job['account_id']));
247
248         if (true !== ($result = Tinebase_ActionQueue::getInstance()->executeAction($job))) {
249             throw new Tinebase_Exception_UnexpectedValue('action queue job execution did not return true: ' . var_export($result, true));
250         }
251         //}
252
253         $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " result: " . var_export($result, true));
254     }
255 }