Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions nightfall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
:license: MIT, see LICENSE for more details.
"""
from .api import Nightfall
from .alerts import SlackAlert, EmailAlert, WebhookAlert, AlertConfig
from .detection_rules import (Regex, WordList, Confidence, ContextRule, MatchType, ExclusionRule, MaskConfig,
RedactionConfig, Detector, LogicalOp, DetectionRule)
from .findings import Finding, Range

__all__ = ["Nightfall", "Regex", "WordList", "Confidence", "ContextRule", "MatchType", "ExclusionRule", "MaskConfig",
"RedactionConfig", "Detector", "LogicalOp", "DetectionRule", "Finding", "Range"]
__all__ = ["Nightfall", "SlackAlert", "EmailAlert", "WebhookAlert", "AlertConfig", "Regex", "WordList", "Confidence",
"ContextRule", "MatchType", "ExclusionRule", "MaskConfig", "RedactionConfig", "Detector", "LogicalOp",
"DetectionRule", "Finding", "Range"]
72 changes: 72 additions & 0 deletions nightfall/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from dataclasses import dataclass

from typing import List, Tuple, Optional

@dataclass
class SlackAlert:
"""SlackAlert contains the configuration required to allow clients to send asynchronous alerts to a Slack
workspace when findings are detected. Note that in order for Slack alerts to be delivered to your workspace,
you must use authenticate Nightfall to your Slack workspace under the Settings menu on the Nightfall Dashboard.

Currently, Nightfall supports delivering alerts to public channels, formatted like "#general".
Alerts are only sent if findings are detected.
Attributes:
target (str): the channel name, formatted like "#general".
"""
target: str

def as_dict(self):
return {"target": self.target}

@dataclass
class EmailAlert:
"""EmailAlert contains the configuration required to allow clients to send an asynchronous email message
when findings are detected. The findings themselves will be delivered as a file attachment on the email.
Alerts are only sent if findings are detected.
Attributes:
address (str): the email address to which alerts should be sent.
"""
address: str

def as_dict(self):
return {"address": self.address}

@dataclass
class WebhookAlert:
"""WebhookAlert contains the configuration required to allow clients to send a webhook event to an
external URL when findings are detected. The URL provided must (1) use the HTTPS scheme, (2) have a
route defined on the HTTP POST method, and (3) return a 200 status code upon receipt of the event.

In contrast to other platforms, when using the file scanning APIs, an alert is also sent to this webhook
*even when there are no findings*.
Attributes:
address (str): the URL to which alerts should be sent.
"""
address: str

def as_dict(self):
return {"address": self.address}

@dataclass
class AlertConfig:
"""AlertConfig allows clients to specify where alerts should be delivered when findings are discovered as
part of a scan. These alerts are delivered asynchronously to all destinations specified in the object instance.
Attributes:
slack (SlackAlert): Send alerts to a Slack workspace when findings are detected.
email (EmailAlert): Send alerts to an email address when findings are detected.
url (WebhookAlert): Send an HTTP webhook event to a URL when findings are detected.
"""
slack: Optional[SlackAlert] = None
email: Optional[EmailAlert] = None
url: Optional[WebhookAlert] = None

def as_dict(self):
result = {}
if self.slack:
result["slack"] = self.slack.as_dict()
if self.email:
result["email"] = self.email.as_dict()
if self.url:
result["url"] = self.url.as_dict()
return result

24 changes: 19 additions & 5 deletions nightfall/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from nightfall.alerts import AlertConfig
from nightfall.detection_rules import DetectionRule, RedactionConfig
from nightfall.exceptions import NightfallUserError, NightfallSystemError
from nightfall.findings import Finding
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(self, key: Optional[str] = None, signing_secret: Optional[str] = No

def scan_text(self, texts: List[str], policy_uuids: List[str] = None, detection_rules: Optional[List[DetectionRule]] = None,
detection_rule_uuids: Optional[List[str]] = None, context_bytes: Optional[int] = None,
default_redaction_config: Optional[RedactionConfig] = None) ->\
default_redaction_config: Optional[RedactionConfig] = None, alert_config: Optional[AlertConfig] = None) ->\
Tuple[List[List[Finding]], List[str]]:
"""Scan text with Nightfall.

Expand All @@ -83,6 +84,8 @@ def scan_text(self, texts: List[str], policy_uuids: List[str] = None, detection_
:param default_redaction_config: The default redaction configuration to apply to all detection rules, unless
there is a more specific config within a detector.
:type default_redaction_config: RedactionConfig or None
:param alert_config: Configures external destinations to fan out alerts to in the event that findings are detected.
:type alert_config: AlertConfig or None
:returns: list of findings, list of redacted input texts
"""

Expand All @@ -98,6 +101,8 @@ def scan_text(self, texts: List[str], policy_uuids: List[str] = None, detection_
policy["contextBytes"] = context_bytes
if default_redaction_config:
policy["defaultRedactionConfig"] = default_redaction_config.as_dict()
if alert_config:
policy["alertConfig"] = alert_config.as_dict()

request_body = {
"payload": texts
Expand Down Expand Up @@ -132,7 +137,8 @@ def _scan_text_v3(self, data: dict):
def scan_file(self, location: str, webhook_url: Optional[str] = None, policy_uuid: Optional[str] = None,
detection_rules: Optional[List[DetectionRule]] = None,
detection_rule_uuids: Optional[List[str]] = None,
request_metadata: Optional[str] = None) -> Tuple[str, str]:
request_metadata: Optional[str] = None,
alert_config: Optional[AlertConfig] = None) -> Tuple[str, str]:
"""Scan file with Nightfall.
At least one of policy_uuid, detection_rule_uuids or detection_rules is required.

Expand All @@ -146,6 +152,8 @@ def scan_file(self, location: str, webhook_url: Optional[str] = None, policy_uui
:type detection_rule_uuids: List[str] or None
:param request_metadata: additional metadata that will be returned with the webhook response
:type request_metadata: str or None
:param alert_config: Configures external destinations to fan out alerts to in the event that findings are detected.
:type alert_config: AlertConfig or None
:returns: (scan_id, message)
"""

Expand All @@ -169,7 +177,8 @@ def scan_file(self, location: str, webhook_url: Optional[str] = None, policy_uui
detection_rules=detection_rules,
detection_rule_uuids=detection_rule_uuids,
webhook_url=webhook_url, policy_uuid=policy_uuid,
request_metadata=request_metadata)
request_metadata=request_metadata,
alert_config=alert_config)
_validate_response(response, 200)
parsed_response = response.json()

Expand Down Expand Up @@ -216,15 +225,20 @@ def _file_scan_finalize(self, session_id: str):

def _file_scan_scan(self, session_id: str, detection_rules: Optional[List[DetectionRule]] = None,
detection_rule_uuids: Optional[List[str]] = None, webhook_url: Optional[str] = None,
policy_uuid: Optional[str] = None, request_metadata: Optional[str] = None) -> requests.Response:
policy_uuid: Optional[str] = None, request_metadata: Optional[str] = None,
alert_config: Optional[AlertConfig] = None) -> requests.Response:
if policy_uuid:
data = {"policyUUID": policy_uuid}
else:
data = {"policy": {"webhookURL": webhook_url}}
data = {"policy": {}}
if webhook_url:
data["policy"]["webhookURL"] = webhook_url
if detection_rule_uuids:
data["policy"]["detectionRuleUUIDs"] = detection_rule_uuids
if detection_rules:
data["policy"]["detectionRules"] = [d.as_dict() for d in detection_rules]
if alert_config:
data["policy"]["alertConfig"] = alert_config.as_dict()

if request_metadata:
data["requestMetadata"] = request_metadata
Expand Down