Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions dojo/tools/gitleaks/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def get_findings(self, filename, test):
dupes = dict()

for issue in issues:
if issue.get('rule'):
if issue.get("rule"):
self.get_finding_legacy(issue, test, dupes)
elif issue.get('Description'):
elif issue.get("Description"):
self.get_finding_current(issue, test, dupes)
else:
raise ValueError('Format is not recognized for Gitleaks')
raise ValueError("Format is not recognized for Gitleaks")

return list(dupes.values())

Expand All @@ -44,10 +44,19 @@ def get_finding_legacy(self, issue, test, dupes):
file_path = issue["file"]
reason = issue["rule"]
titleText = "Hard Coded " + reason
description = "**Commit:** " + issue["commitMessage"].rstrip("\n") + "\n"
description = (
"**Commit:** " + issue["commitMessage"].rstrip("\n") + "\n"
)
description += "**Commit Hash:** " + issue["commit"] + "\n"
description += "**Commit Date:** " + issue["date"] + "\n"
description += "**Author:** " + issue["author"] + " <" + issue["email"] + ">" + "\n"
description += (
"**Author:** "
+ issue["author"]
+ " <"
+ issue["email"]
+ ">"
+ "\n"
)
description += "**Reason:** " + reason + "\n"
description += "**Path:** " + file_path + "\n"
if "lineNumber" in issue:
Expand All @@ -56,8 +65,18 @@ def get_finding_legacy(self, issue, test, dupes):
if "operation" in issue:
description += "**Operation:** " + issue["operation"] + "\n"
if "leakURL" in issue:
description += "**Leak URL:** [" + issue["leakURL"] + "](" + issue["leakURL"] + ")\n"
description += "\n**String Found:**\n\n```\n" + issue["line"].replace(issue["offender"], "REDACTED") + "\n```"
description += (
"**Leak URL:** ["
+ issue["leakURL"]
+ "]("
+ issue["leakURL"]
+ ")\n"
)
description += (
"\n**String Found:**\n\n```\n"
+ issue["line"].replace(issue["offender"], "REDACTED")
+ "\n```"
)

severity = "High"
if "Github" in reason or "AWS" in reason or "Heroku" in reason:
Expand All @@ -75,60 +94,71 @@ def get_finding_legacy(self, issue, test, dupes):
static_finding=True,
)
# manage tags
finding.unsaved_tags = issue.get("tags", "").split(', ')
finding.unsaved_tags = issue.get("tags", "").split(", ")

dupe_key = hashlib.sha256((issue["offender"] + file_path + str(line)).encode("utf-8")).hexdigest()
dupe_key = hashlib.sha256(
(issue["offender"] + file_path + str(line)).encode("utf-8")
).hexdigest()

if dupe_key not in dupes:
dupes[dupe_key] = finding

def get_finding_current(self, issue, test, dupes):
reason = issue.get('Description')
line = issue.get('StartLine')
reason = issue.get("Description")
line = issue.get("StartLine")
if line:
line = int(line)
else:
line = 0
match = issue.get('Match')
secret = issue.get('Secret')
file_path = issue.get('File')
commit = issue.get('Commit')
match = issue.get("Match")
secret = issue.get("Secret")
file_path = issue.get("File")
commit = issue.get("Commit")
# Author and email will not be used because of GDPR
# author = issue.get('Author')
# email = issue.get('Email')
date = issue.get('Date')
message = issue.get('Message')
tags = issue.get('Tags')
ruleId = issue.get('RuleID')
date = issue.get("Date")
message = issue.get("Message")
tags = issue.get("Tags")
ruleId = issue.get("RuleID")

title = f'Hard coded {reason} found in {file_path}'
title = f"Hard coded {reason} found in {file_path}"

description = ''
description = ""
if secret:
description += f'**Secret:** {secret}\n'
description += f"**Secret:** {secret}\n"
if match:
description += f'**Match:** {match}\n'
description += f"**Match:** {match}\n"
if message:
if len(message.split("\n")) > 1:
description += "**Commit message:**" + "\n```\n" + message.replace('```', '\\`\\`\\`') + "\n```\n"
description += (
"**Commit message:**"
+ "\n```\n"
+ message.replace("```", "\\`\\`\\`")
+ "\n```\n"
)
else:
description += f'**Commit message:** {message}\n'
description += f"**Commit message:** {message}\n"
if commit:
description += f'**Commit hash:** {commit}\n'
description += f"**Commit hash:** {commit}\n"
if date:
description += f'**Commit date:** {date}\n'
description += f"**Commit date:** {date}\n"
if ruleId:
description += f'**Rule Id:** {ruleId}'
if description[-1] == '\n':
description += f"**Rule Id:** {ruleId}"
if description[-1] == "\n":
description = description[:-1]

severity = "High"

dupe_key = hashlib.md5((title + secret + str(line)).encode("utf-8")).hexdigest()
dupe_key = hashlib.md5(
(title + secret + str(line)).encode("utf-8")
).hexdigest()

if dupe_key in dupes:
finding = dupes[dupe_key]
finding.description = finding.description + '\n\n***\n\n' + description
finding.description = (
finding.description + "\n\n***\n\n" + description
)
finding.nb_occurences += 1
dupes[dupe_key] = finding
else:
Expand All @@ -142,7 +172,7 @@ def get_finding_current(self, issue, test, dupes):
line=line,
dynamic_finding=False,
static_finding=True,
nb_occurences=1,
nb_occurences=1
)
if tags:
finding.unsaved_tags = tags
Expand Down
52 changes: 29 additions & 23 deletions dojo/tools/gosec/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


class GosecParser(object):

def get_scan_types(self):
return ["Gosec Scanner"]

Expand All @@ -17,32 +16,37 @@ def get_description_for_scan_types(self, scan_type):
def get_findings(self, filename, test):
tree = filename.read()
try:
data = json.loads(str(tree, 'utf-8'))
except:
data = json.loads(str(tree, "utf-8"))
except Exception:
data = json.loads(tree)
dupes = dict()

for item in data["Issues"]:
impact = ''
references = ''
findingdetail = ''
title = ''
impact = ""
references = ""
findingdetail = ""
title = ""
filename = item.get("file")
line = item.get("line")
scanner_confidence = item.get("confidence")

title = item["details"] + " - rule " + item["rule_id"]

# Finding details information
# Finding details information
findingdetail += "Filename: {}\n\n".format(filename)
findingdetail += "Line number: {}\n\n".format(str(line))
findingdetail += "Issue Confidence: {}\n\n".format(scanner_confidence)
findingdetail += "Issue Confidence: {}\n\n".format(
scanner_confidence
)
findingdetail += "Code:\n\n"
findingdetail += "```{}```".format(item["code"])

sev = item["severity"]
# Best attempt at ongoing documentation provided by gosec, based on rule id
references = "https://securego.io/docs/rules/{}.html".format(item['rule_id']).lower()
# Best attempt at ongoing documentation provided by gosec, based on
# rule id
references = "https://securego.io/docs/rules/{}.html".format(
item["rule_id"]
).lower()

if scanner_confidence:
# Assign integer value to confidence.
Expand All @@ -53,9 +57,9 @@ def get_findings(self, filename, test):
elif scanner_confidence == "LOW":
scanner_confidence = 7

if '-' in line:
if "-" in line:
# if this is a range, only point to the beginning.
line = line.split('-', 1)[0]
line = line.split("-", 1)[0]
if line.isdigit():
line = int(line)
else:
Expand All @@ -68,16 +72,18 @@ def get_findings(self, filename, test):
else:
dupes[dupe_key] = True

find = Finding(title=title,
test=test,
description=findingdetail,
severity=sev.title(),
impact=impact,
references=references,
file_path=filename,
line=line,
scanner_confidence=scanner_confidence,
static_finding=True)
find = Finding(
title=title,
test=test,
description=findingdetail,
severity=sev.title(),
impact=impact,
references=references,
file_path=filename,
line=line,
scanner_confidence=scanner_confidence,
static_finding=True
)

dupes[dupe_key] = find

Expand Down
78 changes: 53 additions & 25 deletions dojo/tools/govulncheck/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

logger = logging.getLogger(__name__)

SEVERITY = 'Info'
SEVERITY = "Info"


class GovulncheckParser:

def get_scan_types(self):
return ["Govulncheck Scanner"]

Expand All @@ -22,44 +21,73 @@ def get_description_for_scan_types(self, scan_type):

@staticmethod
def get_location(data, node):
while data['Calls']['Functions'][str(node)]['CallSites'][0]['Parent'] != 1:
node = data['Calls']['Functions'][str(node)]['CallSites'][0]['Parent']
return [f"{x['Pos']['Filename']}:{x['Pos']['Line']}:{x['Pos']['Column']}" for x in
data['Calls']['Functions'][str(node)]['CallSites']]
while (
data["Calls"]["Functions"][str(node)]["CallSites"][0]["Parent"]
!= 1
):
node = data["Calls"]["Functions"][str(node)]["CallSites"][0][
"Parent"
]
return [
f"{x['Pos']['Filename']}:{x['Pos']['Line']}:{x['Pos']['Column']}"
for x in data["Calls"]["Functions"][str(node)]["CallSites"]
]

@staticmethod
def get_version(data, node):
return data['Requires']['Modules'][str(node)]['Version']
return data["Requires"]["Modules"][str(node)]["Version"]

def get_findings(self, scan_file, test):
findings = []
try:
data = json.load(scan_file)
except Exception as e:
except Exception:
raise ValueError("Invalid JSON format")
else:
if data['Vulns']:
list_vulns = data['Vulns']
for cve, elems in groupby(list_vulns, key=lambda vuln: vuln['OSV']['aliases'][0]):
if data["Vulns"]:
list_vulns = data["Vulns"]
for cve, elems in groupby(
list_vulns, key=lambda vuln: vuln["OSV"]["aliases"][0]
):
first_elem = list(islice(elems, 1))
d = {
'cve': cve,
'severity': SEVERITY,
'title': first_elem[0]['OSV']['id'],
'component_name': first_elem[0]['OSV']['affected'][0]['package']['name'],
'component_version': self.get_version(data, first_elem[0]['RequireSink']),
"cve": cve,
"severity": SEVERITY,
"title": first_elem[0]["OSV"]["id"],
"component_name": first_elem[0]["OSV"]["affected"][0][
"package"
]["name"],
"component_version": self.get_version(
data, first_elem[0]["RequireSink"]
),
}
d['references'] = first_elem[0]['OSV']['references'][0]['url']
d['url'] = first_elem[0]['OSV']['affected'][0]['database_specific']['url']
d['unique_id_from_tool'] = first_elem[0]['OSV']['id']
d["references"] = first_elem[0]["OSV"]["references"][0][
"url"
]
d["url"] = first_elem[0]["OSV"]["affected"][0][
"database_specific"
]["url"]
d["unique_id_from_tool"] = first_elem[0]["OSV"]["id"]
vuln_methods = set(
first_elem[0]['OSV']['affected'][0]['ecosystem_specific']['imports'][0]['symbols'])
impact = set(self.get_location(data, first_elem[0]['CallSink']))
first_elem[0]["OSV"]["affected"][0][
"ecosystem_specific"
]["imports"][0]["symbols"]
)
impact = set(
self.get_location(data, first_elem[0]["CallSink"])
)
for elem in elems:
impact.update(self.get_location(data, elem['CallSink']))
impact.update(
self.get_location(data, elem["CallSink"])
)
vuln_methods.update(
elem['OSV']['affected'][0]['ecosystem_specific']['imports'][0]['symbols'])
d['impact'] = '; '.join(impact) if impact else None
d['description'] = f"Vulnerable functions: {'; '.join(vuln_methods)}"
elem["OSV"]["affected"][0]["ecosystem_specific"][
"imports"
][0]["symbols"]
)
d["impact"] = "; ".join(impact) if impact else None
d[
"description"
] = f"Vulnerable functions: {'; '.join(vuln_methods)}"
findings.append(Finding(**d))
return findings
Loading