forked from acsone/git-aggregator
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.py
More file actions
176 lines (161 loc) · 6.65 KB
/
config.py
File metadata and controls
176 lines (161 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# © 2015 ACSONE SA/NV
# License AGPLv3 (http://www.gnu.org/licenses/agpl-3.0-standalone.html)
import logging
import os
from string import Template
import yaml
from ._compat import string_types
from .exception import ConfigException
log = logging.getLogger(__name__)
def get_repos(config, force=False):
"""Return a :py:obj:`list` list of repos from config file.
:param config: the repos config in :py:class:`dict` format.
:param bool force: Force aggregate dirty repos or not.
:type config: dict
:rtype: list
"""
repo_list = []
for directory, repo_data in config.items():
if not os.path.isabs(directory):
directory = os.path.abspath(directory)
repo_dict = {
'cwd': directory,
'defaults': repo_data.get('defaults', dict()),
'force': force,
}
remote_names = set()
if 'remotes' in repo_data:
repo_dict['remotes'] = []
remotes_data = repo_data['remotes'] or {}
for remote_name, url in remotes_data.items():
if not url:
raise ConfigException(
'%s: No url defined for remote %s.' %
(directory, remote_name))
remote_dict = {
'name': remote_name,
'url': url
}
repo_dict['remotes'].append(remote_dict)
remote_names.add(remote_name)
if not remote_names:
raise ConfigException(
'%s: You should at least define one remote.' % directory)
else:
raise ConfigException('%s: remotes is not defined.' % directory)
if 'merges' in repo_data:
merges = []
merge_data = repo_data.get('merges') or []
for merge in merge_data:
try:
# Assume parts is a str
parts = merge.split(' ')
if len(parts) != 2:
raise ConfigException(
'%s: Merge must be formatted as '
'"remote_name ref".' % directory)
merge = {
"remote": parts[0],
"ref": parts[1],
}
except AttributeError:
# Parts is a dict
try:
merge["remote"] = str(merge["remote"])
merge["ref"] = str(merge["ref"])
except KeyError:
raise ConfigException(
'%s: Merge lacks mandatory '
'`remote` or `ref` keys.' % directory)
# Check remote is available
if merge["remote"] not in remote_names:
raise ConfigException(
'%s: Merge remote %s not defined in remotes.' %
(directory, merge["remote"]))
merges.append(merge)
repo_dict['merges'] = merges
if not merges:
raise ConfigException(
'%s: You should at least define one merge.' % directory)
else:
raise ConfigException(
'%s: merges is not defined.' % directory)
# Only fetch required remotes by default
repo_dict["fetch_all"] = repo_data.get("fetch_all", False)
if isinstance(repo_dict["fetch_all"], string_types):
repo_dict["fetch_all"] = frozenset((repo_dict["fetch_all"],))
elif isinstance(repo_dict["fetch_all"], list):
repo_dict["fetch_all"] = frozenset(repo_dict["fetch_all"])
# Explicitly cast to str because e.g. `8.0` will be parsed as float
# There are many cases this doesn't handle, but the float one is common
# because of Odoo conventions
parts = str(repo_data.get('target', "")).split()
remote_name = None
if len(parts) == 0:
branch = "_git_aggregated"
elif len(parts) == 1:
branch = parts[0]
elif len(parts) == 2:
remote_name, branch = parts
else:
raise ConfigException(
'%s: Target must be formatted as '
'"[remote_name] branch_name"' % directory)
if remote_name is not None and remote_name not in remote_names:
raise ConfigException(
'%s: Target remote %s not defined in remotes.' %
(directory, remote_name))
repo_dict['target'] = {
'remote': remote_name,
'branch': branch,
}
commands = []
if 'shell_command_after' in repo_data:
cmds = repo_data['shell_command_after']
# if str: turn to list
if cmds:
if isinstance(cmds, string_types):
cmds = [cmds]
commands = cmds
repo_dict['shell_command_after'] = commands
repo_list.append(repo_dict)
return repo_list
def load_config(config, expand_env=False, env_file=None, force=False):
"""Return repos from a directory and fnmatch. Not recursive.
:param config: paths to config file
:type config: str
:param expand_env: True to expand environment variables in the config.
:type expand_env: bool
:param env_file: path to file with variables to add to the environment.
:type env_file: str or None
:param bool force: True to aggregate even if repo is dirty.
:returns: expanded config dict item
:rtype: iter(dict)
"""
if not os.path.exists(config):
raise ConfigException('Unable to find configuration file: %s' % config)
file_extension = os.path.splitext(config)[1][1:]
if file_extension not in ("yaml", "yml"):
raise ConfigException(
"Only .yaml and .yml configuration files are supported "
"(got %s)" % file_extension
)
if expand_env:
environment = {}
if env_file is not None and os.path.isfile(env_file):
with open(env_file) as env_file_handler:
for line in env_file_handler:
line = line.strip()
if line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=')
environment.update({key.strip(): value.strip()})
environment.update(os.environ)
with open(config) as file_handler:
config = Template(file_handler.read())
config = config.substitute(environment)
else:
config = open(config).read()
conf = yaml.load(config, Loader=yaml.SafeLoader)
return get_repos(conf or {}, force)