Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python-dsl/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
from .container_decorators import dockerfile_rule, compose_rule
from .container_matchers import instruction, missing, service_has, service_missing
from .container_ir import compile_all_rules, compile_to_json
from .container_combinators import (
all_of,
any_of,
none_of,
instruction_after,
instruction_before,
stage,
final_stage_has,
)
from .container_programmatic import custom_check, DockerfileAccess, ComposeAccess

__all__ = [
"dockerfile_rule",
Expand All @@ -13,4 +23,14 @@
"service_missing",
"compile_all_rules",
"compile_to_json",
"all_of",
"any_of",
"none_of",
"instruction_after",
"instruction_before",
"stage",
"final_stage_has",
"custom_check",
"DockerfileAccess",
"ComposeAccess",
]
221 changes: 221 additions & 0 deletions python-dsl/rules/container_combinators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""
Logic combinators for container rules.
"""

from typing import List, Dict, Any, Union, Callable
from dataclasses import dataclass, field
from .container_matchers import Matcher


@dataclass
class CombinatorMatcher:
"""Represents a logic combinator (AND, OR, NOT)."""
combinator_type: str # "all_of", "any_of", "none_of"
conditions: List[Union[Matcher, 'CombinatorMatcher', Dict, Callable]]

def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON IR."""
serialized_conditions = []
for cond in self.conditions:
if hasattr(cond, 'to_dict'):
serialized_conditions.append(cond.to_dict())
elif isinstance(cond, dict):
serialized_conditions.append(cond)
elif callable(cond):
serialized_conditions.append({
"type": "custom_function",
"has_callable": True
})
else:
serialized_conditions.append(cond)

return {
"type": self.combinator_type,
"conditions": serialized_conditions
}


def all_of(*conditions: Union[Matcher, Dict, Callable]) -> CombinatorMatcher:
"""
Combine matchers with AND logic.
All conditions must match for the rule to trigger.

Example:
all_of(
instruction(type="FROM", image_tag="latest"),
missing(instruction="USER"),
instruction(type="RUN", contains="sudo")
)
"""
return CombinatorMatcher(
combinator_type="all_of",
conditions=list(conditions)
)


def any_of(*conditions: Union[Matcher, Dict, Callable]) -> CombinatorMatcher:
"""
Combine matchers with OR logic.
Any condition can match for the rule to trigger.

Example:
any_of(
instruction(type="USER", user_name="root"),
missing(instruction="USER"),
instruction(type="FROM", base_image="scratch")
)
"""
return CombinatorMatcher(
combinator_type="any_of",
conditions=list(conditions)
)


def none_of(*conditions: Union[Matcher, Dict, Callable]) -> CombinatorMatcher:
"""
Combine matchers with NOT logic.
None of the conditions should match for the rule to pass.
(Inverse: if any matches, rule triggers as violation)

Example:
none_of(
instruction(type="HEALTHCHECK"),
instruction(type="USER", user_name_not="root")
)
"""
return CombinatorMatcher(
combinator_type="none_of",
conditions=list(conditions)
)


@dataclass
class SequenceMatcher:
"""Represents instruction sequence validation."""
sequence_type: str # "after" or "before"
instruction: Union[str, Matcher, Dict]
reference: Union[str, Matcher, Dict]
not_followed_by: bool = False

def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON IR."""
def serialize_ref(ref):
if isinstance(ref, str):
return {"instruction": ref}
elif hasattr(ref, 'to_dict'):
return ref.to_dict()
elif isinstance(ref, dict):
return ref
return ref

return {
"type": f"instruction_{self.sequence_type}",
"instruction": serialize_ref(self.instruction),
"reference": serialize_ref(self.reference),
"not_followed_by": self.not_followed_by
}


def instruction_after(
instruction: Union[str, Matcher],
after: Union[str, Matcher],
not_followed_by: bool = False
) -> SequenceMatcher:
"""
Check that an instruction appears after another.

Example:
# Ensure CMD comes after USER
instruction_after(instruction="CMD", after="USER")

# Ensure apt-get install follows apt-get update
instruction_after(
instruction=instruction(type="RUN", contains="apt-get install"),
after=instruction(type="RUN", contains="apt-get update")
)
"""
return SequenceMatcher(
sequence_type="after",
instruction=instruction,
reference=after,
not_followed_by=not_followed_by
)


def instruction_before(
instruction: Union[str, Matcher],
before: Union[str, Matcher],
not_followed_by: bool = False
) -> SequenceMatcher:
"""
Check that an instruction appears before another.

Example:
instruction_before(instruction="USER", before="CMD")
"""
return SequenceMatcher(
sequence_type="before",
instruction=instruction,
reference=before,
not_followed_by=not_followed_by
)


@dataclass
class StageMatcher:
"""Matcher for multi-stage build stage queries."""
stage_type: str
params: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
return {
"type": f"stage_{self.stage_type}",
**self.params
}


def stage(
alias: str = None,
base_image: str = None,
is_final: bool = None,
) -> StageMatcher:
"""
Query a specific build stage.

Example:
stage(alias="builder")
stage(is_final=True)
stage(base_image="alpine")
"""
params = {}
if alias is not None:
params["alias"] = alias
if base_image is not None:
params["base_image"] = base_image
if is_final is not None:
params["is_final"] = is_final

return StageMatcher(stage_type="query", params=params)


def final_stage_has(
instruction: Union[str, Matcher] = None,
missing_instruction: str = None,
) -> StageMatcher:
"""
Check properties of the final build stage.

Example:
final_stage_has(missing_instruction="USER")
final_stage_has(instruction=instruction(type="USER", user_name="root"))
"""
params = {}
if instruction is not None:
if isinstance(instruction, str):
params["instruction"] = instruction
elif hasattr(instruction, 'to_dict'):
params["instruction"] = instruction.to_dict()
if missing_instruction is not None:
params["missing_instruction"] = missing_instruction

return StageMatcher(stage_type="final_has", params=params)
120 changes: 120 additions & 0 deletions python-dsl/rules/container_programmatic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Programmatic access to Dockerfile and docker-compose objects.
"""

from typing import Callable, Dict, Any
from dataclasses import dataclass


@dataclass
class ProgrammaticMatcher:
"""Wraps a custom validation function."""
check_function: Callable
description: str = ""

def to_dict(self) -> Dict[str, Any]:
return {
"type": "programmatic",
"has_callable": True,
"description": self.description,
}


def custom_check(
check: Callable,
description: str = ""
) -> ProgrammaticMatcher:
"""
Create a custom validation function.

The check function receives the parsed dockerfile or compose object
and should return True if the rule matches (vulnerability found).

Example:
@dockerfile_rule(id="DOCKER-CUSTOM-001")
def last_user_is_root():
def check(dockerfile):
final_user = dockerfile.get_final_user()
return final_user is None or final_user.user_name == "root"
return custom_check(check, "Check if last USER is root")
"""
return ProgrammaticMatcher(
check_function=check,
description=description
)


class DockerfileAccess:
"""
Provides programmatic access to Dockerfile structure.
Used in custom validation functions.
"""

def __init__(self, dockerfile_graph):
self._graph = dockerfile_graph

def get_instructions(self, instruction_type: str):
"""Get all instructions of a type."""
return self._graph.GetInstructions(instruction_type)

def has_instruction(self, instruction_type: str) -> bool:
"""Check if instruction type exists."""
return self._graph.HasInstruction(instruction_type)

def get_final_user(self):
"""Get the last USER instruction."""
return self._graph.GetFinalUser()

def is_running_as_root(self) -> bool:
"""Check if container runs as root."""
return self._graph.IsRunningAsRoot()

def get_stages(self):
"""Get all build stages."""
return self._graph.GetStages()

def is_multi_stage(self) -> bool:
"""Check if Dockerfile uses multi-stage build."""
return self._graph.IsMultiStage()

def get_stage_by_alias(self, alias: str):
"""Get a stage by its AS alias."""
return self._graph.GetStageByAlias(alias)

def get_final_stage(self):
"""Get the final build stage."""
return self._graph.GetFinalStage()


class ComposeAccess:
"""
Provides programmatic access to docker-compose structure.
Used in custom validation functions.
"""

def __init__(self, compose_graph):
self._graph = compose_graph

def get_services(self):
"""Get all service names."""
return self._graph.GetServices()

def service_has(self, service_name: str, key: str, value) -> bool:
"""Check if service has property with value."""
return self._graph.ServiceHas(service_name, key, value)

def service_get(self, service_name: str, key: str):
"""Get service property value."""
return self._graph.ServiceGet(service_name, key)

def get_privileged_services(self):
"""Get services with privileged: true."""
return self._graph.GetPrivilegedServices()

def services_with_docker_socket(self):
"""Get services mounting Docker socket."""
return self._graph.ServicesWithDockerSocket()

def services_with_host_network(self):
"""Get services using host network mode."""
return self._graph.ServicesWithHostNetwork()
Loading