From d6f96fb9f3b51d4aed556a81c38ce9f75dd6064b Mon Sep 17 00:00:00 2001 From: xrompdev Date: Mon, 3 Nov 2025 02:08:00 +0800 Subject: [PATCH 1/2] feat: add ECS mode support and enhance job processing logic - Introduced ECS mode in QueueShell for conditional message processing. - Updated output messages to reflect the current mode (ECS or EC2). - Added isCompanyForEcsQueue method in QueuedTask to determine ECS eligibility based on company settings. - Enhanced job handling in QueuedTask to support new job names for ECS-enabled companies. --- Console/Command/QueueShell.php | 31 +++++++++++++++++++++++++++++-- Model/QueuedTask.php | 31 +++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 0e28c50b..652ee9e9 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -297,6 +297,14 @@ public function runworkersqs() { return; } + // Check if running in ECS mode + $enableEcs = !empty($this->params['enable-ecs']); + if ($enableEcs) { + $this->out('[ECS MODE] Only processing messages'); + } else { + $this->out('[EC2 MODE] Only processing messages'); + } + if ($pidFilePath = Configure::read('Queue.pidfilepath')) { if (!file_exists($pidFilePath)) { @@ -346,7 +354,7 @@ public function runworkersqs() { touch($pidFilePath . $pidFileName); } //$this->_log('runworker', isset($pid) ? $pid : null); - $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for ' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...'); $data = $this->QueuedTask->requestSqsJob($queueUrl); //$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); @@ -375,6 +383,11 @@ public function runworkersqs() { } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); + // For ECS consumers, allow tasks suffixed with "-ECS" to map to their base task + // Remove the "-ECS" suffix to get the base task name + if ($enableEcs) { + $data['jobtype'] = preg_replace('/-ECS$/i', '', $data['jobtype']); + } $taskname = 'Queue' . $data['jobtype']; if ($this->{$taskname}->autoUnserialize) { @@ -527,6 +540,16 @@ public function getOptionParser() { 'default' => '' ]; + $subcommandParserSqs = [ + 'options' => [ + 'enable-ecs' => [ + 'help' => 'Enable ECS mode - only process messages', + 'boolean' => true, + 'default' => false + ] + ] + ]; + return parent::getOptionParser() ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ @@ -548,7 +571,11 @@ public function getOptionParser() { ->addSubcommand('runworker', [ 'help' => 'Run Worker', 'parser' => $subcommandParserFull - ]); + ]) + ->addSubcommand('runworkersqs', [ + 'help' => 'Run Worker SQS', + 'parser' => $subcommandParserSqs + ]); } /** diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 8d1dbbc5..c5ce4854 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -87,6 +87,27 @@ public function nextPriority($priority) return $this; } + public function isCompanyForEcsQueue($bidId) { + $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ + 'conditions' => [ + 'CompanyBid.id' => $bidId, + ], + 'contain' => false, + 'fields' => ['company_id'] + ]); + if (empty($companyBid)) { + return false; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'], + 'CompanySetting.slug' => 'ecs_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } + /** * Add a new Job to the Queue. * @@ -114,12 +135,18 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; break; case 'SaveConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['bidId'] . '.' . $additionalKey; + if ($this->isCompanyForEcsQueue($data['bidId'])) { + $jobName = 'SaveConnection-ECS'; + } break; case 'SaveSingleConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; + if ($this->isCompanyForEcsQueue($data['bidId'])) { + $jobName = 'SaveSingleConnection-ECS'; + } break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; From f3fbeb4b87af83dbd576d279ae301a796c535a78 Mon Sep 17 00:00:00 2001 From: xrompdev Date: Wed, 5 Nov 2025 01:22:47 +0800 Subject: [PATCH 2/2] feat: enforce ecs flag for all company --- Model/QueuedTask.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index c5ce4854..57249d16 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -88,6 +88,9 @@ public function nextPriority($priority) } public function isCompanyForEcsQueue($bidId) { + if (Configure::read('Queue.enforceEcsForAll.connection')) { + return true; + } $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ 'conditions' => [ 'CompanyBid.id' => $bidId,