diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 9e812f84..0e28c50b 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -206,6 +206,26 @@ public function runworker() { if ($this->QueuedTask->exit === true) { $this->_exit = true; } else { + // check if we are over the memory limit and end processing if so + $memoryLimit = Configure::read('Queue.workermaxmemory'); + $workerMaxMemoryTimeout = Configure::read('Queue.workermaxmemorytimeout'); + if ($memoryLimit) { + $memoryUsage = $this->_humanReadableBytes(memory_get_usage(true)); + $this->out('Memory usage: ' . $memoryUsage); + if ($memoryUsage >= $memoryLimit) { + $this->out('Reached memory limit of ' . $memoryUsage . ' (Max ' . $memoryLimit . 'MB), skipping this job.'); + // Mark job as failed due to memory constraints + $this->QueuedTask->markJobFailed($data['id'], 'Not enough memory to run this job. Worker memory usage hit ' . $memoryUsage . 'MB, over the ' . $memoryLimit . 'MB max limit.'); + if ($workerMaxMemoryTimeout) { + $this->out('Exiting in ' . $workerMaxMemoryTimeout . ' seconds due to memory limit.'); + sleep($workerMaxMemoryTimeout); + $this->_exit = true; + } + sleep(Configure::read('Queue.sleeptime')); + $this->hr(); + continue; + } + } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'Queue' . $data['jobtype']; @@ -333,6 +353,26 @@ public function runworkersqs() { if ($this->QueuedTask->exit === true) { $this->_exit = true; } else { + // check if we are over the memory limit and end processing if so + $memoryLimit = Configure::read('Queue.workermaxmemory'); + $workerMaxMemoryTimeout = Configure::read('Queue.workermaxmemorytimeout'); + if ($memoryLimit) { + $memoryUsage = $this->_humanReadableBytes(memory_get_usage(true)); + $this->out('Memory usage: ' . $memoryUsage); + if ($memoryUsage >= $memoryLimit) { + $this->out('Reached memory limit of ' . $memoryUsage . ' (Max ' . $memoryLimit . 'MB), skipping this job.'); + // Mark job as failed due to memory constraints + $this->QueuedTask->markJobFailed($data['id'], 'Not enough memory to run this job. Worker memory usage hit ' . $memoryUsage . 'MB, over the ' . $memoryLimit . 'MB max limit.'); + if ($workerMaxMemoryTimeout) { + $this->out('Exiting in ' . $workerMaxMemoryTimeout . ' seconds due to memory limit.'); + sleep($workerMaxMemoryTimeout); + $this->_exit = true; + } + sleep(Configure::read('Queue.sleeptime')); + $this->hr(); + continue; + } + } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'Queue' . $data['jobtype']; @@ -621,4 +661,19 @@ public function __destruct() { } } +/** + * Format bytes into human readable format + * + * @param int $bytes Number of bytes + * @return string Human readable bytes format + */ + protected function _humanReadableBytes($bytes) { + $units = ['B', 'KB', 'MB', 'GB', 'TB']; + $bytes = max($bytes, 0); + $pow = floor(($bytes ? log($bytes) : 0) / log(1024)); + $pow = min($pow, count($units) - 1); + $bytes /= pow(1024, $pow); + return round($bytes, 2) . ' ' . $units[$pow]; + } + }