0013220: rework action queue
authorPaul Mehrer <p.mehrer@metaways.de>
Thu, 30 Mar 2017 15:06:56 +0000 (17:06 +0200)
committerPhilipp Schüle <p.schuele@metaways.de>
Fri, 16 Jun 2017 11:22:52 +0000 (13:22 +0200)
https://forge.tine20.org/view.php?id=13220

Change-Id: I0efb498b00cc77f6d6caa52b3504e9dd7dc7287a
Reviewed-on: http://gerrit.tine20.com/customers/4470
Reviewed-by: Philipp Schüle <p.schuele@metaways.de>
Tested-by: Philipp Schüle <p.schuele@metaways.de>
tests/tine20/Tinebase/AllTests.php
tests/tine20/Tinebase/DaemonTest.php [new file with mode: 0644]
tests/tine20/Tinebase/DummyController.php [new file with mode: 0644]
tine20/Tinebase/ActionQueue.php
tine20/Tinebase/ActionQueue/Worker.php
tine20/Tinebase/Config.php
tine20/library/Console/Daemon.php
tine20/worker.php

index 2142b0a..54f5b1b 100644 (file)
@@ -75,6 +75,7 @@ class Tinebase_AllTests
         $suite->addTestSuite('Tinebase_LockTest');
         $suite->addTestSuite('Tinebase_ScheduledImportTest');
         $suite->addTestSuite('Tinebase_Delegators_DelegateTest');
+        $suite->addTestSuite('Tinebase_DaemonTest');
 
         $suite->addTest(Tinebase_User_AllTests::suite());
         $suite->addTest(Tinebase_Group_AllTests::suite());
diff --git a/tests/tine20/Tinebase/DaemonTest.php b/tests/tine20/Tinebase/DaemonTest.php
new file mode 100644 (file)
index 0000000..f5f4332
--- /dev/null
@@ -0,0 +1,145 @@
+<?php
+/**
+ * Tine 2.0 - http://www.tine20.org
+ *
+ * @package     Tinebase
+ * @license     http://www.gnu.org/licenses/agpl.html
+ * @copyright   Copyright (c) 2017-2017 Metaways Infosystems GmbH (http://www.metaways.de)
+ * @author      Paul Mehrer <p.mehrer@metaways.de>
+ */
+
+/**
+ * Test helper
+ */
+require_once dirname(dirname(__FILE__)) . DIRECTORY_SEPARATOR . 'TestHelper.php';
+
+/**
+ * Test class for Tinebase_CustomField
+ */
+class Tinebase_DaemonTest extends PHPUnit_Framework_TestCase
+{
+    protected static $oldActionQueueConfig = null;
+    protected static $oldIniFileContent = null;
+
+    public static function setUpBeforeClass()
+    {
+        static::$oldActionQueueConfig = Tinebase_Core::getConfig()->{Tinebase_Config::ACTIONQUEUE};
+        $actionQueueConfig = static::$oldActionQueueConfig;
+        $actionQueueConfig->{Tinebase_Config::ACTIONQUEUE_BACKEND} = 'redis';
+
+        Tinebase_Core::getConfig()->set(Tinebase_Config::ACTIONQUEUE, $actionQueueConfig);
+
+        if (@is_file('/etc/tine20/actionQueue.ini')) {
+            static::$oldIniFileContent = file_get_contents('/etc/tine20/actionQueue.ini');
+        }
+
+        $configData = 'general.daemonize=1' . PHP_EOL . 'general.logfile=/var/log/tine20/daemon.log' . PHP_EOL . 'general.loglevel=7' . PHP_EOL . 'tine20.shutDownWait=10';
+        static::assertEquals(strlen($configData), file_put_contents('/etc/tine20/actionQueue.ini', $configData), 'writing config data failed');
+    }
+
+    public static function tearDownAfterClass()
+    {
+        Tinebase_Core::getConfig()->set(Tinebase_Config::ACTIONQUEUE, static::$oldActionQueueConfig);
+        static::$oldActionQueueConfig = null;
+
+        if (null !== static::$oldIniFileContent) {
+            file_put_contents('/etc/tine20/actionQueue.ini', static::$oldIniFileContent);
+            static::$oldIniFileContent = null;
+        } else {
+            unlink('/etc/tine20/actionQueue.ini');
+        }
+
+        @unlink('/var/run/tine20/DummyController.txt');
+    }
+
+    protected function setUp()
+    {
+    }
+
+    protected function tearDown()
+    {
+        $this->_stopDaemon();
+    }
+
+    public function testStart()
+    {
+        $this->_startDaemon();
+
+        // 200 ms
+        usleep(200000);
+        clearstatcache();
+        $this->assertTrue(is_file('/var/run/tine20/actionQueue.pid'), 'could not find pid file');
+    }
+
+    public function testStartStop()
+    {
+        clearstatcache();
+        $this->assertFalse(is_file('/var/run/tine20/actionQueue.pid'), 'found old pid file');
+
+        $this->testStart();
+
+        $startTime = microtime(true);
+        $this->_stopDaemon(true);
+        $endTime = microtime(true);
+        $totalTime = $endTime - $startTime;
+
+        $this->assertTrue($totalTime < 1.9, 'shut down took longer than 1.9 sec, shouldn\'t happen: ' . $totalTime);
+    }
+
+    public function testGracefulShutDown()
+    {
+        static::markTestSkipped('failed, maybe ansible setup issue');
+        
+        $this->testStart();
+
+        @unlink('/var/run/tine20/DummyController.txt');
+        Tinebase_ActionQueue::getInstance()->queueAction('Tinebase_FOO_DummyController.sleepNSec', 2);
+
+        // 10 ms so the daemon can pick up the job
+        usleep(10000);
+
+        $startTime = microtime(true);
+        $this->_stopDaemon(true);
+        $endTime = microtime(true);
+        $totalTime = $endTime - $startTime;
+
+        $this->assertTrue(is_file('/var/run/tine20/DummyController.txt'), 'could not find file /var/run/tine20/DummyController.txt');
+        $this->assertEquals('success 2', file_get_contents('/var/run/tine20/DummyController.txt'));
+        $this->assertTrue($totalTime > 1.9, 'shut down should take more than 1.9 sec: ' . $totalTime);
+    }
+
+    protected function _startDaemon()
+    {
+        exec('nohup php -d include_path=/etc/tine20/:' . dirname(__DIR__) . ' /usr/local/share/tine20.git/tine20/worker.php 2>1 > /dev/null');
+    }
+
+    protected function _stopDaemon($_assert = false)
+    {
+        if (true === $_assert) {
+            $this->assertTrue(is_file('/var/run/tine20/actionQueue.pid'), 'could not find pid file /var/run/tine20/actionQueue.pid');
+        }
+        if (!is_file('/var/run/tine20/actionQueue.pid')) {
+            return;
+        }
+
+        $pid = file_get_contents('/var/run/tine20/actionQueue.pid');
+
+        posix_kill($pid, SIGTERM);
+
+        $startTime = time();
+        while (time() - $startTime < 5) {
+            // 10 ms
+            usleep(10000);
+            clearstatcache();
+            if (!is_file('/var/run/tine20/actionQueue.pid')) {
+                break;
+            }
+        }
+        if (true === $_assert) {
+            // 10 ms
+            usleep(10000);
+            clearstatcache();
+            $this->assertFalse(is_file('/var/run/tine20/actionQueue.pid'), 'pid file is still there!');
+        }
+    }
+}
\ No newline at end of file
diff --git a/tests/tine20/Tinebase/DummyController.php b/tests/tine20/Tinebase/DummyController.php
new file mode 100644 (file)
index 0000000..1255dfb
--- /dev/null
@@ -0,0 +1,35 @@
+<?php
+/**
+ * Tine 2.0 - http://www.tine20.org
+ *
+ * @package     Tinebase
+ * @license     http://www.gnu.org/licenses/agpl.html
+ * @copyright   Copyright (c) 2017-2017 Metaways Infosystems GmbH (http://www.metaways.de)
+ * @author      Paul Mehrer <p.mehrer@metaways.de>
+ */
+
+
+/**
+ * Dummy Controller, to be used for example by queue with action like 'Tinebase_FOO_DummyController.someMethod'
+ */
+class Tinebase_DummyController
+{
+    public static function getInstance()
+    {
+        return new self();
+    }
+
+    public function sleepNSec($n)
+    {
+        /*if (Tinebase_Core::isLogLevel(Zend_Log::DEBUG)) Tinebase_Core::getLogger()->debug(__METHOD__ . '::' . __LINE__
+            . ' start sleeping...');*/
+
+        sleep($n);
+        file_put_contents('/var/run/tine20/DummyController.txt', 'success ' . $n);
+
+        /*if (Tinebase_Core::isLogLevel(Zend_Log::DEBUG)) Tinebase_Core::getLogger()->debug(__METHOD__ . '::' . __LINE__
+            . ' done');*/
+
+        return true;
+    }
+}
\ No newline at end of file
index 42cf801..652bd02 100644 (file)
     {
         $options = null;
         $backend = self::BACKEND_DIRECT;
+        $config = Tinebase_Core::getConfig()->{Tinebase_Config::ACTIONQUEUE};
 
         /** @noinspection PhpUndefinedFieldInspection */
-        if (isset(Tinebase_Core::getConfig()->actionqueue) && Tinebase_Core::getConfig()->actionqueue->active) {
+        if ($config && isset($config->{Tinebase_Config::ACTIONQUEUE_BACKEND})) {
             /** @noinspection PhpUndefinedFieldInspection */
-            $options = Tinebase_Core::getConfig()->actionqueue->toArray();
+            $options = $config->toArray();
             
-            $backend = (isset($options['backend']) || array_key_exists('backend', $options)) ? ucfirst(strtolower($options['backend'])) : $backend;
-            unset($options['backend']);
+            $backend = (isset($options[Tinebase_Config::ACTIONQUEUE_BACKEND]) || array_key_exists(Tinebase_Config::ACTIONQUEUE_BACKEND, $options)) ? ucfirst(strtolower($options[Tinebase_Config::ACTIONQUEUE_BACKEND])) : $backend;
+            unset($options[Tinebase_Config::ACTIONQUEUE_BACKEND]);
         }
         
         $className = 'Tinebase_ActionQueue_Backend_' . $backend;
index 793508b..5b87d94 100755 (executable)
@@ -8,6 +8,7 @@
  * @copyright   Copyright (c) 2012-2016 Metaways Infosystems GmbH (http://www.metaways.de)
  */
 
+
 /**
  * Daemon to check the job queue and process jobs in separate processes
  * 
@@ -18,6 +19,8 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
 {
     const EXECUTION_METHOD_DISPATCH = 'dispatch';
     const EXECUTION_METHOD_EXEC_CLI = 'exec_cli';
+
+    protected $_stopped = false;
     
     /** 
      * default configurations of this daemon
@@ -33,6 +36,7 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
             'executionMethod' => self::EXECUTION_METHOD_DISPATCH,
             'maxRetry'        => 10,
             'maxChildren'     => 10,
+            'shutDownWait'    => 60,
         )
     );
     
@@ -53,22 +57,32 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
             exit(1);
         }
 
-        while (true) {
-            
+        $maxChildren = $this->_getConfig()->tine20->maxChildren;
+
+        while (!$this->_stopped) {
+
             // manage the number of children
-            if (count ($this->_children) >= $this->_getConfig()->tine20->maxChildren ) {
+            if (count ($this->_children) >=  $maxChildren) {
+
+                // TODO this will log A LOT, add a timeout between logs...
+
                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " reached max children limit: " . $this->_getConfig()->tine20->maxChildren);
                 $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " number of pending jobs:" . Tinebase_ActionQueue::getInstance()->getQueueSize());
-                usleep(100); // save some trees
+                usleep(1000); // save some trees
+                pcntl_signal_dispatch();
                 continue;
             }
             
-            $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " trying to fetch a job from queue");
+            $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " trying to fetch a job from queue " . microtime(true));
 
             $jobId = Tinebase_ActionQueue::getInstance()->waitForJob();
 
+            $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " signal dispatch " . microtime(true));
+
+            pcntl_signal_dispatch();
+
             // no job found
-            if ($jobId === FALSE) {
+            if ($jobId === FALSE || $this->_stopped) {
                 continue;
             }
             
@@ -78,6 +92,8 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
                 $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ . " failed to receive message: " . $re->getMessage());
                 
                 // we are unable to process the job
+                // probably the retry count is exceeded
+                // TODO push message to dead letter queue
                 Tinebase_ActionQueue::getInstance()->delete($jobId);
                 
                 continue;
@@ -85,13 +101,16 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
             
             $this->_getLogger()->info (__METHOD__ . '::' . __LINE__ . " forking to process job {$job['action']} with id $jobId");
             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " process message: " . print_r($job, TRUE)); 
-            
+
+
+            // TODO fork may not work!!!
             $childPid = $this->_forkChild();
             
             if ($childPid == 0) { // executed in child process
                 try {
                     $this->_executeAction($job);
 
+                    $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " exiting...");
                     exit(0); // message will be deleted in parent process
                     
                 } catch (Exception $e) {
@@ -106,6 +125,42 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
                 $this->_jobScoreBoard[$childPid] = $jobId;
             }
         }
+
+        $this->_shutDown();
+    }
+
+    protected function _shutDown()
+    {
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " shutting down... " . microtime(true));
+
+        $timeStart = time();
+        $timeElapsed = 0;
+        $shutDownWait = (int)($this->_getConfig()->tine20->shutDownWait);
+
+        while ($timeElapsed < $shutDownWait) {
+
+            pcntl_signal_dispatch();
+
+            if (count($this->_children) === 0) {
+                break;
+            }
+
+            // 10ms
+            usleep(10000);
+
+            $timeElapsed = time() - $timeStart;
+        }
+
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " parent shut down... " . microtime(true));
+
+        parent::_shutDown();
+    }
+
+    protected function _gracefulShutDown()
+    {
+        $this->_stopped = true;
+
+        return true;
     }
 
     /**
@@ -130,20 +185,30 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
      */
     protected function _childTerminated($pid, $status)
     {
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " with pid $pid");
+
+        $jobId = $this->_jobScoreBoard[$pid];
+        unset($this->_jobScoreBoard[$pid]);
+
+        if (true !== pcntl_wifexited($status)) {
+            $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " child $pid did not finish successfully!");
+
+            Tinebase_ActionQueue::getInstance()->reschedule($jobId);
+
+            return;
+        }
         parent::_childTerminated($pid, $status);
         
         $status = pcntl_wexitstatus($status);
         
-        $jobId = $this->_jobScoreBoard[$pid];
-        unset($this->_jobScoreBoard[$pid]);
-        
+
         if ($status > 0) { // failure
-            $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " job $jobId did not finish successfully. Will be rescheduled!"); 
+            $this->_getLogger()->crit(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid did not finish successfully. Will be rescheduled!");
             
             Tinebase_ActionQueue::getInstance()->reschedule($jobId);
             
         } else {           // success
-            $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " job $jobId finished successfully");
+            $this->_getLogger()->info(__METHOD__ . '::' . __LINE__ .    " job $jobId in pid $pid finished successfully");
             
             Tinebase_ActionQueue::getInstance()->delete($jobId);
         }
@@ -158,6 +223,8 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
      */
     protected function _executeAction($job)
     {
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " with isChild: " . var_export($this->_isChild, true));
+
         // execute in subprocess
         /*if ($this->_getConfig()->tine20->executionMethod === self::EXECUTION_METHOD_EXEC_CLI) {
             $output = system('php $paths ./../../tine20.php --method Tinebase.executeQueueJob message=' . escapeshellarg($job), $exitCode );
@@ -167,11 +234,17 @@ class Tinebase_ActionQueue_Worker extends Console_Daemon
 
         // execute in same process
         } else { */
+            //Zend_Registry::_unsetInstance();
+
             Tinebase_Core::initFramework();
     
             Tinebase_Core::set(Tinebase_Core::USER, Tinebase_User::getInstance()->getFullUserById($job['account_id']));
-            
-            Tinebase_ActionQueue::getInstance()->executeAction($job);
+
+        if (true !== ($result = Tinebase_ActionQueue::getInstance()->executeAction($job))) {
+            throw new Tinebase_Exception_UnexpectedValue('action queue job execution did not return true: ' . var_export($result, true));
+        }
         //}
+
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ . " result: " . var_export($result, true));
     }
 }
index ac310e6..fa0db23 100644 (file)
@@ -522,6 +522,10 @@ class Tinebase_Config extends Tinebase_Config_Abstract
     const FILESYSTEM_PREVIEW_SERVICE_URL = 'previewServiceUrl';
     const FILESYSTEM_ENABLE_NOTIFICATIONS = 'enableNotifications';
 
+    const ACTIONQUEUE = 'actionqueue';
+    const ACTIONQUEUE_BACKEND = 'backend';
+
+
     /**
      * (non-PHPdoc)
      * @see tine20/Tinebase/Config/Definition::$_properties
@@ -757,6 +761,27 @@ class Tinebase_Config extends Tinebase_Config_Abstract
             ),
             'default'                           => array()
         ),
+        self::ACTIONQUEUE => array(
+            //_('Action queue configuration')
+            'label'                 => 'Action queue configuration',
+            //_('Action queue configuration.')
+            'description'           => 'Action queue configuration.',
+            'type'                  => 'object',
+            'class'                 => 'Tinebase_Config_Struct',
+            'clientRegistryInclude' => FALSE,
+            'setByAdminModule'      => FALSE,
+            'setBySetupModule'      => TRUE,
+            'content'               => array(
+                self::ACTIONQUEUE_BACKEND       => array(
+                    'type'                              => Tinebase_Config::TYPE_STRING,
+                    'default'                           => 'Direct'
+                ),
+            ),
+            'default'                           => array()
+        ),
+        /*const ACTIONQUEUE = 'actionqueue';
+    const ACTIONQUEUE_ACTIVE = 'active';
+    const ACTIONQUEUE_BACKEND = 'backend';*/
         self::USERBACKEND => array(
                                    //_('User Configuration')
             'label'                 => 'User Configuration',
index 3a9f092..e841ccc 100644 (file)
@@ -4,15 +4,10 @@
  * 
  * @package     Console
  * @license     http://www.gnu.org/licenses/agpl.html AGPL Version 3
- * @copyright   Copyright (c) 2010-2016 Metaways Infosystems GmbH (http://www.metaways.de)
+ * @copyright   Copyright (c) 2010-2017 Metaways Infosystems GmbH (http://www.metaways.de)
  * @author      Lars Kneschke <l.kneschke@metaways.de>
  */
 
-// TODO move this to helper script? where is the right place for this?
-set_time_limit(0);
-ob_implicit_flush();
-declare(ticks = 1);
-
 
 /**
  * Base class for console daemons
@@ -36,6 +31,8 @@ abstract class Console_Daemon
      * @var Zend_Config
      */
     protected $_config;
+
+    protected $_isChild = false;
     
     /**
      * @var array $_defaultConfig
@@ -145,10 +142,10 @@ abstract class Console_Daemon
     /**
      * handle terminated children
      *
-     * @param unknown $pid
-     * @param unknown $status
+     * @param string $pid
+     * @param string $status
      */
-    protected function _childTerminated($pid, $status)
+    protected function _childTerminated($pid, /** @noinspection PhpUnusedParameterInspection */$status)
     {
         unset($this->_children[$pid]);
     }
@@ -218,7 +215,7 @@ abstract class Console_Daemon
         $childPid = pcntl_fork();
         
         if($childPid < 0) {
-            #fwrite(STDERR, "Something went wrong while forking to background" . PHP_EOL);
+            fwrite(STDERR, "Something went wrong while forking new child" . PHP_EOL);
             exit(1);
         }
         
@@ -231,6 +228,7 @@ abstract class Console_Daemon
         } else {
             // a child has no children
             $this->_children = array();
+            $this->_isChild = true;
         }
         
         return $childPid;
@@ -270,7 +268,7 @@ abstract class Console_Daemon
         $childPid = pcntl_fork();
         
         if ($childPid < 0) {
-            #fwrite(STDERR, "Something went wrong while forking to background" . PHP_EOL);
+            fwrite(STDERR, "Something went wrong while forking to background" . PHP_EOL);
             exit;
         }
         
@@ -338,22 +336,37 @@ abstract class Console_Daemon
      */
     public function handleSigTERM()
     {
-        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " SIGTERM received");
-        
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " SIGTERM received, is child: " . var_export($this->_isChild, true) . " " . microtime(true));
+
+        // do we want to gracefully shut down?
+        if (true === $this->_gracefulShutDown()) {
+            return;
+        }
+
+        $this->_shutDown();
+    }
+
+    protected function _shutDown()
+    {
         foreach($this->_children as $pid) {
             $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " send SIGTERM to child " . $pid);
             posix_kill($pid, SIGTERM);
         }
-        
+
         $pidFile = $this->getPidFile();
-        
+
         if ($pidFile) {
             @unlink($pidFile);
         }
-        
+
         exit(0);
     }
-    
+
+    protected function _gracefulShutDown()
+    {
+        return false;
+    }
+
     /**
      * handle signal SIGCHILD
      */
@@ -365,22 +378,18 @@ abstract class Console_Daemon
     }
     
     /**
-     * handle signal SIGINT
+     * handle signal SIGHUP
      */
     public function handleSigHUP()
     {
-        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " SIGHUP received");
-        
-        $this->_config = null;
-        $this->_logger = null;
+        $this->_getLogger()->debug(__METHOD__ . '::' . __LINE__ .    " SIGHUP received, but we don't do anything in this case");
     }
     
     /**
      * handle signal SIGINT
-     * @param int $signal  the signal
      */
-    public function handleSigINT($signal)
+    public function handleSigINT()
     {
-        $this->handleSigTERM($signal);
+        $this->handleSigTERM();
     }
 }
\ No newline at end of file
index f2569d1..cac9a24 100644 (file)
@@ -14,6 +14,8 @@ if (PHP_SAPI != 'cli') {
 
 require_once 'bootstrap.php';
 
+set_time_limit(0);
+
 $daemon = new Tinebase_ActionQueue_Worker();
 $daemon->run();