0011960: print with 300 dpi by default
[tine20] / tine20 / Tinebase / ActionQueue / Worker.php
1 <?php
2 /**
3  * Tine 2.0
4  * @package     Tinebase
5  * @subpackage  ActionQueue
6  * @license     http://www.gnu.org/licenses/agpl.html AGPL Version 3
7  * @author      Züleyha Toptas <z.toptas@hotmail.de>
8  * @copyright   Copyright (c) 2012-2013 Metaways Infosystems GmbH (http://www.metaways.de)
9  */
10
11 /**
12  * Daemon to check the job queue and process jobs in separate processes
13  * 
14  * @package     Tinebase
15  * @subpackage  ActionQueue
16  */
17 class Tinebase_ActionQueue_Worker extends Console_Daemon
18 {
19     const EXECUTION_METHOD_DISPATCH = 'dispatch';
20     const EXECUTION_METHOD_EXEC_CLI = 'exec_cli';
21     
22     /** 
23      * default configurations of this daemon
24      * 
25      * @var array
26      */
27     protected static $_defaultConfig = array(
28         'general' => array(
29             'configfile' => '/etc/tine20/actionQueue.ini', 
30             'pidfile'    => '/var/run/tine20/actionQueue.pid',
31         ),
32         'tine20' => array (
33             'executionMethod' => self::EXECUTION_METHOD_DISPATCH,
34             'maxRetry'        => 10,
35             'maxChildren'     => 10,
36         )
37     );
38     
39     /**
40      * keeps mapping from process id to job id
41      * 
42      * @var array
43      */
44     protected $_jobScoreBoard = array();
45     
46     /**
47      * infinite loop where daemon manages the execution of the jobs from the job queue
48      */
49     public function run()
50     {
51         if ('Tinebase_ActionQueue_Backend_Direct' === Tinebase_ActionQueue::getInstance()->getBackendType()) {
52             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . ' Tinebase_ActionQueue_Backend_Direct used. There is nothing to do for the worker! Configure Redis backend for example if you want to make use of the worker.');
53             exit(1);
54         }
55
56         while (true) {
57             
58             // manage the number of children
59             if (count ($this->_children) >= $this->_getConfig()->tine20->maxChildren ) {
60                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " reached max children limit: " . $this->_getConfig()->tine20->maxChildren);
61                 $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " number of pending jobs:" . Tinebase_ActionQueue::getInstance()->getQueueSize());
62                 usleep(100); // save some trees
63                 continue;
64             }
65             
66             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " trying to fetch a job from queue");
67
68             $jobId = Tinebase_ActionQueue::getInstance()->waitForJob();
69
70             // no job found
71             if ($jobId === FALSE) {
72                 continue;
73             }
74             
75             try {
76                 $job = Tinebase_ActionQueue::getInstance()->receive($jobId);
77             } catch (RuntimeException $re) {
78                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . " failed to receive message: " . $re->getMessage());
79                 
80                 // we are unable to process the job
81                 Tinebase_ActionQueue::getInstance()->delete($jobId);
82                 
83                 continue;
84             }
85             
86             $this->_getLogger()->info (__METHOD__ . '::' . __LINE__ . " forking to process job {$job['action']} with id $jobId");
87             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " process message: " . print_r($job, TRUE)); 
88             
89             $childPid = $this->_forkChild();
90             
91             if ($childPid == 0) { // executed in child process
92                 try {
93                     $this->_executeAction($job);
94
95                     exit(0); // message will be deleted in parent process
96                     
97                 } catch (Exception $e) {
98                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $job['action']);
99                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getMessage());
100                     $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " could not execute job : " . $e->getTraceAsString());
101
102                     exit(1); // message will be rescheduled in parent process
103                 }
104                 
105             } else { // executed in parent process
106                 $this->_jobScoreBoard[$childPid] = $jobId;
107             }
108         }
109     }
110
111     /**
112      * We have to destroy the Tinebase_ActionQueue instance before the process forks.
113      * Otherwise the resource holding the connection to the queue backend will be
114      * shared between the parent and the child which leads to strange problems
115      * 
116      * @see Console_Daemon::_beforeFork()
117      */
118     protected function _beforeFork()
119     {
120         Tinebase_ActionQueue::destroyInstance();
121     }
122     
123     /**
124      * handle terminated processes
125      * either delete or reschedule the job
126      * 
127      * @param  string  $pid     the pid of the process
128      * @param  string  $status  the exit status of the process 
129      * @return void
130      */
131     protected function _childTerminated($pid, $status)
132     {
133         parent::_childTerminated($pid, $status);
134         
135         $status = pcntl_wexitstatus($status);
136         
137         $jobId = $this->_jobScoreBoard[$pid];
138         unset($this->_jobScoreBoard[$pid]);
139         
140         if ($status > 0) { // failure
141             $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " job $jobId did not finish successfully. Will be rescheduled!"); 
142             
143             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
144             
145         } else {           // success
146             $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " job $jobId finished successfully");
147             
148             Tinebase_ActionQueue::getInstance()->delete($jobId);
149         }
150     }
151     
152     /**
153      * execute the action
154      *
155      * @param  string  $job
156      * @todo make self::EXECUTION_METHOD_EXEC_CLI working
157      */
158     protected function _executeAction($job)
159     {
160         // execute in subprocess
161         if ($this->_getConfig()->tine20->executionMethod === self::EXECUTION_METHOD_EXEC_CLI) {
162             $output = system('php $paths ./../../tine20.php --method Tinebase.executeQueueJob message=' . escapeshellarg($job), $exitCode );
163             if (exitCode != 0) {
164                 throw new Exception('Problem during execution with shell: ' . $output);
165             }
166
167         // execute in same process
168         } else {
169             Tinebase_Core::initFramework();
170     
171             Tinebase_Core::set(Tinebase_Core::USER, Tinebase_User::getInstance()->getFullUserById($job['account_id']));
172             
173             Tinebase_ActionQueue::getInstance()->executeAction($job);
174         }
175     }
176 }