-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathutils.py
More file actions
executable file
·138 lines (119 loc) · 5.72 KB
/
utils.py
File metadata and controls
executable file
·138 lines (119 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import zipfile
import platform
from devsecops_engine_tools.engine_utilities.github.infrastructure.github_api import (
GithubApi,
)
from devsecops_engine_tools.engine_utilities.ssh.managment_private_key import (
create_ssh_private_file,
add_ssh_private_key,
decode_base64,
config_knowns_hosts,
)
from devsecops_engine_tools.engine_utilities.utils.logger_info import MyLogger
from devsecops_engine_tools.engine_utilities import settings
logger = MyLogger.__call__(**settings.SETTING_LOGGER).get_logger()
import base64
import re
from devsecops_engine_tools.engine_core.src.domain.model.threshold import Threshold
from devsecops_engine_tools.engine_core.src.domain.model.level_vulnerability import (
LevelVulnerability,
)
from devsecops_engine_tools.engine_core.src.domain.model.level_compliance import (
LevelCompliance,
)
class Utils:
def unzip_file(self, zip_file_path, extract_path):
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
def configurate_external_checks(self, tool, config_tool, secret_tool, secret_external_checks):
try:
agent_env = None
secret = None
github_token = None
github_api = GithubApi()
if secret_tool is not None:
secret = secret_tool
github_token = github_api.get_installation_access_token(
secret["github_token"],
config_tool[tool]["APP_ID_GITHUB"],
config_tool[tool]["INSTALLATION_ID_GITHUB"]
)
elif secret_external_checks is not None:
secret_external_checks_parts = {
"github_token": (
secret_external_checks.split("github_token:")[1]
if "github_token" in secret_external_checks
else None
),
"github_apps": (
secret_external_checks.split("github_apps:")[1]
if "github_apps" in secret_external_checks
else None
),
"repository_ssh_private_key": (
secret_external_checks.split("ssh:")[1].split(":")[0]
if "ssh" in secret_external_checks
else None
),
"repository_ssh_password": (
secret_external_checks.split("ssh:")[1].split(":")[1]
if "ssh" in secret_external_checks
else None
),
}
secret = {
key: secret_external_checks_parts[key]
for key in secret_external_checks_parts
if secret_external_checks_parts[key] is not None
}
if secret is None:
logger.warning("The secret is not configured for external controls")
elif config_tool[tool]["USE_EXTERNAL_CHECKS_GIT"] and platform.system() in (
"Linux", "Darwin",
):
config_knowns_hosts(
config_tool[tool]["EXTERNAL_GIT_SSH_HOST"],
config_tool[tool]["EXTERNAL_GIT_PUBLIC_KEY_FINGERPRINT"],
)
ssh_key_content = decode_base64(secret["repository_ssh_private_key"])
ssh_key_file_path = "/tmp/ssh_key_file"
create_ssh_private_file(ssh_key_file_path, ssh_key_content)
ssh_key_password = decode_base64(secret["repository_ssh_password"])
agent_env = add_ssh_private_key(ssh_key_file_path, ssh_key_password)
elif config_tool[tool]["USE_EXTERNAL_CHECKS_DIR"]:
if not github_token:
github_token = github_api.get_installation_access_token(
secret.get("github_apps"),
config_tool[tool]["APP_ID_GITHUB"],
config_tool[tool]["INSTALLATION_ID_GITHUB"]
) if secret.get("github_apps") else secret.get("github_token")
github_api.download_latest_release_assets(
config_tool[tool]["EXTERNAL_DIR_OWNER"],
config_tool[tool]["EXTERNAL_DIR_REPOSITORY"],
github_token,
"/tmp",
)
except Exception as ex:
logger.error(f"An error occurred configuring external checks: {ex}")
return agent_env
def encode_token_to_base64(self, token):
token_bytes = f"{token}:".encode("utf-8")
base64_token = base64.b64encode(token_bytes).decode("utf-8")
return base64_token
def update_threshold(self, threshold: Threshold, exclusions_data, pipeline_name):
def set_threshold(new_threshold):
threshold.vulnerability = LevelVulnerability(new_threshold.get("VULNERABILITY"))
threshold.compliance = LevelCompliance(new_threshold.get("COMPLIANCE")) if new_threshold.get("COMPLIANCE") else threshold.compliance
threshold.cve = new_threshold.get("CVE") if new_threshold.get("CVE") is not None else threshold.cve
return threshold
threshold_pipeline = exclusions_data.get(pipeline_name, {}).get("THRESHOLD", {})
if threshold_pipeline:
return set_threshold(threshold_pipeline)
search_patterns = exclusions_data.get("BY_PATTERN_SEARCH", {})
match_pattern = next(
(v["THRESHOLD"]
for pattern, v in search_patterns.items()
if re.match(pattern, pipeline_name, re.IGNORECASE)),
None
)
return set_threshold(match_pattern) if match_pattern else threshold