[v8.0] Introduce Scout Agent and Optimizer#7251
Conversation
…RACv8 Bring up to date
…RACv8 Bring up to date
fstagni
left a comment
There was a problem hiding this comment.
Thank you for the PR.
The main problem I see, that requires change, is that you are calling directly JobDB() to get/set JobParameters. But, JobParameters can be stored also in ElasticSearch (and will only be stored there from DIRAC v9 on). So, use JobStateUpdateClient() instead.
| """Sets defaults | ||
| """ | ||
|
|
||
| self.jobDB = JobStateUpdateClient() |
There was a problem hiding this comment.
Maybe it was me not being clear, but this can't be a 1-to-1 substitution between JobDB and JobStateUpdateClient, as the 2 don't expose the same methods. For example, JobStateUpdateClient does not expose the selectJobs method used below.
There was a problem hiding this comment.
Ah I see. Does JobStateUpdateClient access only the methods found in JobStateUpdateHandler, or are there others?
There was a problem hiding this comment.
In general, I would do this, for added clarity:
| self.jobDB = JobStateUpdateClient() | |
| self.jobStateUpdate = JobStateUpdateClient() |
then you'll need at least also
self.jobMonitoring = JobMonitoringClient()| failedjoblist = [] | ||
| failedsitelist = [] | ||
| stalledjoblist = [] | ||
| scoutjobs = result['Value'].keys() |
There was a problem hiding this comment.
beware, this is python3 and not py2 code.
There was a problem hiding this comment.
This is safer:
| scoutjobs = result['Value'].keys() | |
| scoutjobs = list(result['Value']) |
fstagni
left a comment
There was a problem hiding this comment.
And, would you add a unit test or two?
andresailer
left a comment
There was a problem hiding this comment.
Could you add some documentation about this somewhere?
From the #7083
"Scout jobs" are created at the job submission -- when a user submits a set of jobs, our client tool makes a smaller set of shorter jobs as "scout jobs" and submits them (the original and the scout) altogether.
Would be interesting to know how this is implemented at the job submission level.
| self.totalScoutJobs = Operations().getValue('WorkloadManagement/Scouting/totalScoutJobs', 10) | ||
| self.criteriaFailedRate = Operations().getValue('WorkloadManagement/Scouting/criteriaFailedRate', 0.5) | ||
| self.criteriaSucceededRate = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceededRate', 0.3) | ||
| self.criteriaStalledRate = Operations().getValue('WorkloadManagement/Scouting/criteriaStalledRate', 1.0) | ||
| self.criteriaFailed = Operations().getValue('WorkloadManagement/Scouting/criteriaFailed', | ||
| int(self.totalScoutJobs * self.criteriaFailedRate)) | ||
| self.criteriaSucceeded = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceeded', | ||
| int(self.totalScoutJobs * self.criteriaSucceededRate)) | ||
| self.criteriaStalled = Operations().getValue('WorkloadManagement/Scouting/criteriaStalled', | ||
| int(self.totalScoutJobs * self.criteriaStalledRate)) |
There was a problem hiding this comment.
Can this go via the Agent options, or is this used also in other places?
There was a problem hiding this comment.
In any case you have to add the agent to the ConfigTemplate file?
36f67d0 to
cd8fc37
Compare
| def execute(self): | ||
| """The ScoutingJobStatus execution method. | ||
| """ | ||
| result = self.jobDB.selectJobs({'Status': 'Scouting'}) |
There was a problem hiding this comment.
JobStateUpdateClient doesn't seem to have a method that does what we want here. However, it seems that getJobs() in JobMonitoringHandler might have this functionality. Would you recommend using getJobs, or implementing this in JobStateUpdateHandler?
| """Sets defaults | ||
| """ | ||
|
|
||
| self.jobDB = JobStateUpdateClient() |
There was a problem hiding this comment.
In general, I would do this, for added clarity:
| self.jobDB = JobStateUpdateClient() | |
| self.jobStateUpdate = JobStateUpdateClient() |
then you'll need at least also
self.jobMonitoring = JobMonitoringClient()| failedjoblist = [] | ||
| failedsitelist = [] | ||
| stalledjoblist = [] | ||
| scoutjobs = result['Value'].keys() |
There was a problem hiding this comment.
This is safer:
| scoutjobs = result['Value'].keys() | |
| scoutjobs = list(result['Value']) |
|
|
||
| if len(donejoblist) >= criteriaSucceeded: | ||
| self.log.verbose(f'Scout (ID = {scoutid}) are done.') | ||
| scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'} |
There was a problem hiding this comment.
| scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'} | |
| scoutStatus = {'Status': JobStatus.CHECKING, 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'} |
and similar in the lines below.
| self.jobLog.info('Skipping optimizer, since no scout \ | ||
| corresponding to this job group') |
There was a problem hiding this comment.
This would print lots of empty spaces. Better to do
| self.jobLog.info('Skipping optimizer, since no scout \ | |
| corresponding to this job group') | |
| self.jobLog.info('Skipping optimizer, since no scout ' | |
| 'corresponding to this job group') | |
| result = jobState.getAttribute('RescheduleCounter') | ||
| if not result['OK']: | ||
| return result | ||
| if result['Value'] == None: |
There was a problem hiding this comment.
| if result['Value'] == None: | |
| if result['Value'] is None: |
|
Ping! @qdcampagna |
|
Sorry for the long delay, there were problems with the development server used to test this. We are working on setting up a new one so hopefully soon there will be progress on this. |
|
Hello, this PR is now 1 year old, and I do not see much movement. |
|
Closing because of inaction. |
This pull request introduces the Scout Agent and Optimizer. They submit a subset of primary jobs first, then only allows the primary jobs to run once a certain number of Scout jobs are successfully finished. Follow up to #7118. Closes #7083.
BEGINRELEASENOTES
*WorkloadManagement
NEW: Introduce Agent and Optimizer for scouting jobs
ENDRELEASENOTES