support template agents#918
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces built-in template agents (general, plan, explore, build, research) and a set of reusable harness callbacks (such as loop guard, plan check, stop gate, and sub-agent limits) to manage agent execution and workflows. The code review highlights several robustness issues across these new callbacks, specifically pointing out potential AttributeError crashes. The feedback recommends replacing isinstance(..., dict) checks with hasattr(..., 'get') to ensure compatibility with OmegaConf DictConfig objects, and adding explicit type validation for parsed JSON structures and configuration dictionaries before accessing their keys.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| todos = data.get('todos') if isinstance(data, dict) else data | ||
| if not todos: | ||
| return '' | ||
| lines = [] | ||
| for t in todos: | ||
| t = t or {} | ||
| lines.append( | ||
| f"- [{t.get('status', '?')}] {t.get('content', '')}") |
There was a problem hiding this comment.
If todos is not a list (e.g., if it is a dictionary or a string), or if any element t in todos is not a dictionary, this code will raise an AttributeError when calling t.get(...). Since this callback runs during agent execution, a crash here will block the agent. We should explicitly verify that todos is a list and that each element t is a dictionary before accessing its keys.
| todos = data.get('todos') if isinstance(data, dict) else data | |
| if not todos: | |
| return '' | |
| lines = [] | |
| for t in todos: | |
| t = t or {} | |
| lines.append( | |
| f"- [{t.get('status', '?')}] {t.get('content', '')}") | |
| todos = data.get('todos') if isinstance(data, dict) else data | |
| if not isinstance(todos, list): | |
| return '' | |
| lines = [] | |
| for t in todos: | |
| if not isinstance(t, dict): | |
| continue | |
| lines.append( | |
| f"- [{t.get('status', '?')}] {t.get('content', '')}") |
| if isinstance(data, dict): | ||
| todos = data.get('todos') or [] | ||
| elif isinstance(data, list): | ||
| todos = data | ||
| else: | ||
| todos = [] | ||
| return [ | ||
| t for t in todos | ||
| if str((t or {}).get('status', '')).strip().lower() not in _DONE | ||
| ] |
There was a problem hiding this comment.
If todos is not a list (e.g., if data.get('todos') is a dictionary or a string), or if any element t in todos is not a dictionary, calling (t or {}).get(...) will raise an AttributeError. We should explicitly verify that todos is a list and that each element t is a dictionary before attempting to get the 'status' key.
| if isinstance(data, dict): | |
| todos = data.get('todos') or [] | |
| elif isinstance(data, list): | |
| todos = data | |
| else: | |
| todos = [] | |
| return [ | |
| t for t in todos | |
| if str((t or {}).get('status', '')).strip().lower() not in _DONE | |
| ] | |
| if isinstance(data, dict): | |
| todos = data.get('todos') | |
| elif isinstance(data, list): | |
| todos = data | |
| else: | |
| todos = [] | |
| if not isinstance(todos, list): | |
| todos = [] | |
| return [ | |
| t for t in todos | |
| if isinstance(t, dict) and str(t.get('status', '')).strip().lower() not in _DONE | |
| ] |
| ctype = getattr(check, 'type', None) | ||
| path = self._resolve(getattr(check, 'path', None)) | ||
|
|
||
| if ctype == 'artifact_exists': | ||
| if not path or not os.path.isfile(path): | ||
| return (getattr(check, 'message', None) | ||
| or f'expected artifact not found: ' | ||
| f'{getattr(check, "path", path)}') | ||
| return None | ||
|
|
||
| if ctype == 'min_size_ratio': | ||
| baseline = self._resolve(getattr(check, 'baseline', None)) | ||
| min_ratio = float(getattr(check, 'min_ratio', 0.5)) | ||
| if not path or not os.path.isfile(path): | ||
| return getattr(check, 'message', | ||
| None) or 'expected artifact not found' | ||
| cur = os.path.getsize(path) | ||
| base = os.path.getsize(baseline) if ( | ||
| baseline and os.path.isfile(baseline)) else 0 | ||
| if base and (cur / base) < min_ratio: | ||
| return getattr( | ||
| check, 'message', | ||
| None) or 'the final artifact looks over-compressed' | ||
| return None | ||
|
|
||
| if ctype == 'llm_quality': | ||
| if path and os.path.isfile(path): | ||
| with open(path, 'r', encoding='utf-8') as f: | ||
| content = f.read() | ||
| else: | ||
| content = self._last_assistant_text(messages) | ||
| if not content.strip(): | ||
| return None | ||
| model = str( | ||
| getattr(check, 'model', None) | ||
| or getattr(self._llm, 'model', 'qwen3.5-plus')) | ||
| api_key = getattr(check, 'openai_api_key', None) or getattr( | ||
| self._llm, 'openai_api_key', None) | ||
| base_url = getattr(check, 'openai_base_url', None) or getattr( | ||
| self._llm, 'openai_base_url', None) | ||
| checker = LLMQualityChecker(model, api_key, base_url, | ||
| getattr(check, 'system_prompt', None)) | ||
| return checker.check(content) |
There was a problem hiding this comment.
Using getattr(check, ...) on check will fail to look up keys if check is a plain Python dict (which is common when configs are parsed or converted). Since DictConfig and plain dict both support .get(), we can safely use .get() for all lookups by ensuring check is treated as a dictionary-like object. This makes the check logic robust and compatible with both types.
c = check if hasattr(check, 'get') else {}
ctype = c.get('type')
path = self._resolve(c.get('path'))
if ctype == 'artifact_exists':
if not path or not os.path.isfile(path):
return (c.get('message')
or f'expected artifact not found: '
f'{c.get("path", path)}')
return None
if ctype == 'min_size_ratio':
baseline = self._resolve(c.get('baseline'))
min_ratio = float(c.get('min_ratio', 0.5))
if not path or not os.path.isfile(path):
return c.get('message') or 'expected artifact not found'
cur = os.path.getsize(path)
base = os.path.getsize(baseline) if (
baseline and os.path.isfile(baseline)) else 0
if base and (cur / base) < min_ratio:
return c.get('message') or 'the final artifact looks over-compressed'
return None
if ctype == 'llm_quality':
if path and os.path.isfile(path):
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
else:
content = self._last_assistant_text(messages)
if not content.strip():
return None
model = str(
c.get('model')
or getattr(self._llm, 'model', 'qwen3.5-plus'))
api_key = c.get('openai_api_key') or getattr(
self._llm, 'openai_api_key', None)
base_url = c.get('openai_base_url') or getattr(
self._llm, 'openai_base_url', None)
checker = LLMQualityChecker(model, api_key, base_url,
c.get('system_prompt'))
return checker.check(content)| verdict = json.loads(raw) | ||
| if verdict.get('pass', True): | ||
| return None | ||
| return verdict.get('reason', 'quality_check_failed') |
There was a problem hiding this comment.
If the LLM returns a non-dictionary JSON response (such as a list, string, or boolean), json.loads(raw) will succeed but calling verdict.get(...) will raise an AttributeError. We should explicitly check that verdict is a dictionary before calling .get().
| verdict = json.loads(raw) | |
| if verdict.get('pass', True): | |
| return None | |
| return verdict.get('reason', 'quality_check_failed') | |
| verdict = json.loads(raw) | |
| if not isinstance(verdict, dict): | |
| return None | |
| if verdict.get('pass', True): | |
| return None | |
| return verdict.get('reason', 'quality_check_failed') |
| def _tc_field(tc, key): | ||
| return tc.get(key, '') if isinstance(tc, dict) else getattr( | ||
| tc, key, '') |
There was a problem hiding this comment.
Using isinstance(tc, dict) will evaluate to False if tc is an OmegaConf DictConfig (since DictConfig does not inherit from dict). It is safer and more Pythonic to check hasattr(tc, 'get') to support any dictionary-like mapping.
| def _tc_field(tc, key): | |
| return tc.get(key, '') if isinstance(tc, dict) else getattr( | |
| tc, key, '') | |
| @staticmethod | |
| def _tc_field(tc, key): | |
| return tc.get(key, '') if hasattr(tc, 'get') else getattr( | |
| tc, key, '') |
| def _name(tc) -> str: | ||
| return str(tc.get('tool_name', '') if isinstance(tc, dict) else getattr( | ||
| tc, 'tool_name', '')) |
There was a problem hiding this comment.
Using isinstance(tc, dict) will evaluate to False if tc is an OmegaConf DictConfig. It is safer and more Pythonic to check hasattr(tc, 'get') to support any dictionary-like mapping.
| def _name(tc) -> str: | |
| return str(tc.get('tool_name', '') if isinstance(tc, dict) else getattr( | |
| tc, 'tool_name', '')) | |
| @staticmethod | |
| def _name(tc) -> str: | |
| return str(tc.get('tool_name', '') if hasattr(tc, 'get') else getattr( | |
| tc, 'tool_name', '')) |
| wrote_plan = any( | ||
| str((tc.get('tool_name', '') if isinstance(tc, dict) else getattr( | ||
| tc, 'tool_name', ''))).endswith('todo_write') | ||
| for tc in assistant.tool_calls) |
There was a problem hiding this comment.
Using isinstance(tc, dict) will evaluate to False if tc is an OmegaConf DictConfig. It is safer and more Pythonic to check hasattr(tc, 'get') to support any dictionary-like mapping.
| wrote_plan = any( | |
| str((tc.get('tool_name', '') if isinstance(tc, dict) else getattr( | |
| tc, 'tool_name', ''))).endswith('todo_write') | |
| for tc in assistant.tool_calls) | |
| wrote_plan = any( | |
| str((tc.get('tool_name', '') if hasattr(tc, 'get') else getattr( | |
| tc, 'tool_name', ''))).endswith('todo_write') | |
| for tc in assistant.tool_calls) |
Change Summary
Related issue number
Checklist
pre-commit installandpre-commit run --all-filesbefore git commit, and passed lint check.