Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions Console/Command/QueueShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ public function runworkersqs() {
return;
}

// Check if running in ECS mode
$enableEcs = !empty($this->params['enable-ecs']);
if ($enableEcs) {
$this->out('[ECS MODE] Only processing messages');
} else {
$this->out('[EC2 MODE] Only processing messages');
}


if ($pidFilePath = Configure::read('Queue.pidfilepath')) {
if (!file_exists($pidFilePath)) {
Expand Down Expand Up @@ -346,7 +354,7 @@ public function runworkersqs() {
touch($pidFilePath . $pidFileName);
}
//$this->_log('runworker', isset($pid) ? $pid : null);
$this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...');
$this->out('[' . date('Y-m-d H:i:s') . '] Looking for ' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...');

$data = $this->QueuedTask->requestSqsJob($queueUrl);
//$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group);
Expand Down Expand Up @@ -375,6 +383,11 @@ public function runworkersqs() {
}
if ($data) {
$this->out('Running Job of type "' . $data['jobtype'] . '"');
// For ECS consumers, allow tasks suffixed with "-ECS" to map to their base task
// Remove the "-ECS" suffix to get the base task name
if ($enableEcs) {
$data['jobtype'] = preg_replace('/-ECS$/i', '', $data['jobtype']);
}
$taskname = 'Queue' . $data['jobtype'];

if ($this->{$taskname}->autoUnserialize) {
Expand Down Expand Up @@ -527,6 +540,16 @@ public function getOptionParser() {
'default' => ''
];

$subcommandParserSqs = [
'options' => [
'enable-ecs' => [
'help' => 'Enable ECS mode - only process messages',
'boolean' => true,
'default' => false
]
]
];

return parent::getOptionParser()
->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system."))
->addSubcommand('clean', [
Expand All @@ -548,7 +571,11 @@ public function getOptionParser() {
->addSubcommand('runworker', [
'help' => 'Run Worker',
'parser' => $subcommandParserFull
]);
])
->addSubcommand('runworkersqs', [
'help' => 'Run Worker SQS',
'parser' => $subcommandParserSqs
]);
}

/**
Expand Down
34 changes: 32 additions & 2 deletions Model/QueuedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ public function nextPriority($priority)
return $this;
}

public function isCompanyForEcsQueue($bidId) {
if (Configure::read('Queue.enforceEcsForAll.connection')) {
return true;
}
$companyBid = ClassRegistry::init('CompanyBid')->find('first', [
'conditions' => [
'CompanyBid.id' => $bidId,
],
'contain' => false,
'fields' => ['company_id']
]);
if (empty($companyBid)) {
return false;
}
$companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [
'conditions' => [
'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'],
'CompanySetting.slug' => 'ecs_queue_enabled',
],
'contain' => false,
]);
return !empty($companySetting['CompanySetting']['value']);
}

/**
* Add a new Job to the Queue.
*
Expand Down Expand Up @@ -114,12 +138,18 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu
$dupeKey = $data['company_id'] . '.' . $data['sub_service_id'];
break;
case 'SaveConnection':
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$dupeKey = $data['bidId'] . '.' . $additionalKey;
if ($this->isCompanyForEcsQueue($data['bidId'])) {
$jobName = 'SaveConnection-ECS';
}
break;
case 'SaveSingleConnection':
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey;
if ($this->isCompanyForEcsQueue($data['bidId'])) {
$jobName = 'SaveSingleConnection-ECS';
}
break;
case 'SyncIntercomCompany':
$dupeKey = $data['company_id'];
Expand Down