diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 0e28c50b..652ee9e9 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -297,6 +297,14 @@ public function runworkersqs() { return; } + // Check if running in ECS mode + $enableEcs = !empty($this->params['enable-ecs']); + if ($enableEcs) { + $this->out('[ECS MODE] Only processing messages'); + } else { + $this->out('[EC2 MODE] Only processing messages'); + } + if ($pidFilePath = Configure::read('Queue.pidfilepath')) { if (!file_exists($pidFilePath)) { @@ -346,7 +354,7 @@ public function runworkersqs() { touch($pidFilePath . $pidFileName); } //$this->_log('runworker', isset($pid) ? $pid : null); - $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for ' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...'); $data = $this->QueuedTask->requestSqsJob($queueUrl); //$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); @@ -375,6 +383,11 @@ public function runworkersqs() { } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); + // For ECS consumers, allow tasks suffixed with "-ECS" to map to their base task + // Remove the "-ECS" suffix to get the base task name + if ($enableEcs) { + $data['jobtype'] = preg_replace('/-ECS$/i', '', $data['jobtype']); + } $taskname = 'Queue' . $data['jobtype']; if ($this->{$taskname}->autoUnserialize) { @@ -527,6 +540,16 @@ public function getOptionParser() { 'default' => '' ]; + $subcommandParserSqs = [ + 'options' => [ + 'enable-ecs' => [ + 'help' => 'Enable ECS mode - only process messages', + 'boolean' => true, + 'default' => false + ] + ] + ]; + return parent::getOptionParser() ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ @@ -548,7 +571,11 @@ public function getOptionParser() { ->addSubcommand('runworker', [ 'help' => 'Run Worker', 'parser' => $subcommandParserFull - ]); + ]) + ->addSubcommand('runworkersqs', [ + 'help' => 'Run Worker SQS', + 'parser' => $subcommandParserSqs + ]); } /** diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 8d1dbbc5..57249d16 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -87,6 +87,30 @@ public function nextPriority($priority) return $this; } + public function isCompanyForEcsQueue($bidId) { + if (Configure::read('Queue.enforceEcsForAll.connection')) { + return true; + } + $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ + 'conditions' => [ + 'CompanyBid.id' => $bidId, + ], + 'contain' => false, + 'fields' => ['company_id'] + ]); + if (empty($companyBid)) { + return false; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'], + 'CompanySetting.slug' => 'ecs_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } + /** * Add a new Job to the Queue. * @@ -114,12 +138,18 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; break; case 'SaveConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['bidId'] . '.' . $additionalKey; + if ($this->isCompanyForEcsQueue($data['bidId'])) { + $jobName = 'SaveConnection-ECS'; + } break; case 'SaveSingleConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; + if ($this->isCompanyForEcsQueue($data['bidId'])) { + $jobName = 'SaveSingleConnection-ECS'; + } break; case 'SyncIntercomCompany': $dupeKey = $data['company_id'];