From fe3302e455f5ec913653bf38683327c31be7053d Mon Sep 17 00:00:00 2001 From: shivasurya Date: Tue, 9 Dec 2025 09:28:40 +0530 Subject: [PATCH] feat(docker): Add Python DSL advanced features Adds advanced DSL features for complex container rules: - all_of(), any_of(), none_of() logic combinators - instruction_after(), instruction_before() sequence validation - stage(), final_stage_has() for multi-stage builds - custom_check() for programmatic validation - DockerfileAccess and ComposeAccess wrapper classes All components have 97-100% test coverage (44 new tests). Files added: - python-dsl/rules/container_combinators.py - python-dsl/rules/container_programmatic.py - python-dsl/tests/test_container_combinators.py - python-dsl/tests/test_container_programmatic.py Files modified: - python-dsl/rules/__init__.py (added new exports) Part of: Dockerfile & Docker Compose Support Depends on: PR #5 (Python DSL Core) Next PR: #7 Integration & Rule Library --- python-dsl/rules/__init__.py | 20 ++ python-dsl/rules/container_combinators.py | 221 ++++++++++++++++++ python-dsl/rules/container_programmatic.py | 120 ++++++++++ .../tests/test_container_combinators.py | 209 +++++++++++++++++ .../tests/test_container_programmatic.py | 211 +++++++++++++++++ 5 files changed, 781 insertions(+) create mode 100644 python-dsl/rules/container_combinators.py create mode 100644 python-dsl/rules/container_programmatic.py create mode 100644 python-dsl/tests/test_container_combinators.py create mode 100644 python-dsl/tests/test_container_programmatic.py diff --git a/python-dsl/rules/__init__.py b/python-dsl/rules/__init__.py index 10eba0fa..8d87d20d 100644 --- a/python-dsl/rules/__init__.py +++ b/python-dsl/rules/__init__.py @@ -3,6 +3,16 @@ from .container_decorators import dockerfile_rule, compose_rule from .container_matchers import instruction, missing, service_has, service_missing from .container_ir import compile_all_rules, compile_to_json +from .container_combinators import ( + all_of, + any_of, + none_of, + instruction_after, + instruction_before, + stage, + final_stage_has, +) +from .container_programmatic import custom_check, DockerfileAccess, ComposeAccess __all__ = [ "dockerfile_rule", @@ -13,4 +23,14 @@ "service_missing", "compile_all_rules", "compile_to_json", + "all_of", + "any_of", + "none_of", + "instruction_after", + "instruction_before", + "stage", + "final_stage_has", + "custom_check", + "DockerfileAccess", + "ComposeAccess", ] diff --git a/python-dsl/rules/container_combinators.py b/python-dsl/rules/container_combinators.py new file mode 100644 index 00000000..c442b4c3 --- /dev/null +++ b/python-dsl/rules/container_combinators.py @@ -0,0 +1,221 @@ +""" +Logic combinators for container rules. +""" + +from typing import List, Dict, Any, Union, Callable +from dataclasses import dataclass, field +from .container_matchers import Matcher + + +@dataclass +class CombinatorMatcher: + """Represents a logic combinator (AND, OR, NOT).""" + combinator_type: str # "all_of", "any_of", "none_of" + conditions: List[Union[Matcher, 'CombinatorMatcher', Dict, Callable]] + + def to_dict(self) -> Dict[str, Any]: + """Convert to JSON IR.""" + serialized_conditions = [] + for cond in self.conditions: + if hasattr(cond, 'to_dict'): + serialized_conditions.append(cond.to_dict()) + elif isinstance(cond, dict): + serialized_conditions.append(cond) + elif callable(cond): + serialized_conditions.append({ + "type": "custom_function", + "has_callable": True + }) + else: + serialized_conditions.append(cond) + + return { + "type": self.combinator_type, + "conditions": serialized_conditions + } + + +def all_of(*conditions: Union[Matcher, Dict, Callable]) -> CombinatorMatcher: + """ + Combine matchers with AND logic. + All conditions must match for the rule to trigger. + + Example: + all_of( + instruction(type="FROM", image_tag="latest"), + missing(instruction="USER"), + instruction(type="RUN", contains="sudo") + ) + """ + return CombinatorMatcher( + combinator_type="all_of", + conditions=list(conditions) + ) + + +def any_of(*conditions: Union[Matcher, Dict, Callable]) -> CombinatorMatcher: + """ + Combine matchers with OR logic. + Any condition can match for the rule to trigger. + + Example: + any_of( + instruction(type="USER", user_name="root"), + missing(instruction="USER"), + instruction(type="FROM", base_image="scratch") + ) + """ + return CombinatorMatcher( + combinator_type="any_of", + conditions=list(conditions) + ) + + +def none_of(*conditions: Union[Matcher, Dict, Callable]) -> CombinatorMatcher: + """ + Combine matchers with NOT logic. + None of the conditions should match for the rule to pass. + (Inverse: if any matches, rule triggers as violation) + + Example: + none_of( + instruction(type="HEALTHCHECK"), + instruction(type="USER", user_name_not="root") + ) + """ + return CombinatorMatcher( + combinator_type="none_of", + conditions=list(conditions) + ) + + +@dataclass +class SequenceMatcher: + """Represents instruction sequence validation.""" + sequence_type: str # "after" or "before" + instruction: Union[str, Matcher, Dict] + reference: Union[str, Matcher, Dict] + not_followed_by: bool = False + + def to_dict(self) -> Dict[str, Any]: + """Convert to JSON IR.""" + def serialize_ref(ref): + if isinstance(ref, str): + return {"instruction": ref} + elif hasattr(ref, 'to_dict'): + return ref.to_dict() + elif isinstance(ref, dict): + return ref + return ref + + return { + "type": f"instruction_{self.sequence_type}", + "instruction": serialize_ref(self.instruction), + "reference": serialize_ref(self.reference), + "not_followed_by": self.not_followed_by + } + + +def instruction_after( + instruction: Union[str, Matcher], + after: Union[str, Matcher], + not_followed_by: bool = False +) -> SequenceMatcher: + """ + Check that an instruction appears after another. + + Example: + # Ensure CMD comes after USER + instruction_after(instruction="CMD", after="USER") + + # Ensure apt-get install follows apt-get update + instruction_after( + instruction=instruction(type="RUN", contains="apt-get install"), + after=instruction(type="RUN", contains="apt-get update") + ) + """ + return SequenceMatcher( + sequence_type="after", + instruction=instruction, + reference=after, + not_followed_by=not_followed_by + ) + + +def instruction_before( + instruction: Union[str, Matcher], + before: Union[str, Matcher], + not_followed_by: bool = False +) -> SequenceMatcher: + """ + Check that an instruction appears before another. + + Example: + instruction_before(instruction="USER", before="CMD") + """ + return SequenceMatcher( + sequence_type="before", + instruction=instruction, + reference=before, + not_followed_by=not_followed_by + ) + + +@dataclass +class StageMatcher: + """Matcher for multi-stage build stage queries.""" + stage_type: str + params: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "type": f"stage_{self.stage_type}", + **self.params + } + + +def stage( + alias: str = None, + base_image: str = None, + is_final: bool = None, +) -> StageMatcher: + """ + Query a specific build stage. + + Example: + stage(alias="builder") + stage(is_final=True) + stage(base_image="alpine") + """ + params = {} + if alias is not None: + params["alias"] = alias + if base_image is not None: + params["base_image"] = base_image + if is_final is not None: + params["is_final"] = is_final + + return StageMatcher(stage_type="query", params=params) + + +def final_stage_has( + instruction: Union[str, Matcher] = None, + missing_instruction: str = None, +) -> StageMatcher: + """ + Check properties of the final build stage. + + Example: + final_stage_has(missing_instruction="USER") + final_stage_has(instruction=instruction(type="USER", user_name="root")) + """ + params = {} + if instruction is not None: + if isinstance(instruction, str): + params["instruction"] = instruction + elif hasattr(instruction, 'to_dict'): + params["instruction"] = instruction.to_dict() + if missing_instruction is not None: + params["missing_instruction"] = missing_instruction + + return StageMatcher(stage_type="final_has", params=params) diff --git a/python-dsl/rules/container_programmatic.py b/python-dsl/rules/container_programmatic.py new file mode 100644 index 00000000..943c8b1b --- /dev/null +++ b/python-dsl/rules/container_programmatic.py @@ -0,0 +1,120 @@ +""" +Programmatic access to Dockerfile and docker-compose objects. +""" + +from typing import Callable, Dict, Any +from dataclasses import dataclass + + +@dataclass +class ProgrammaticMatcher: + """Wraps a custom validation function.""" + check_function: Callable + description: str = "" + + def to_dict(self) -> Dict[str, Any]: + return { + "type": "programmatic", + "has_callable": True, + "description": self.description, + } + + +def custom_check( + check: Callable, + description: str = "" +) -> ProgrammaticMatcher: + """ + Create a custom validation function. + + The check function receives the parsed dockerfile or compose object + and should return True if the rule matches (vulnerability found). + + Example: + @dockerfile_rule(id="DOCKER-CUSTOM-001") + def last_user_is_root(): + def check(dockerfile): + final_user = dockerfile.get_final_user() + return final_user is None or final_user.user_name == "root" + return custom_check(check, "Check if last USER is root") + """ + return ProgrammaticMatcher( + check_function=check, + description=description + ) + + +class DockerfileAccess: + """ + Provides programmatic access to Dockerfile structure. + Used in custom validation functions. + """ + + def __init__(self, dockerfile_graph): + self._graph = dockerfile_graph + + def get_instructions(self, instruction_type: str): + """Get all instructions of a type.""" + return self._graph.GetInstructions(instruction_type) + + def has_instruction(self, instruction_type: str) -> bool: + """Check if instruction type exists.""" + return self._graph.HasInstruction(instruction_type) + + def get_final_user(self): + """Get the last USER instruction.""" + return self._graph.GetFinalUser() + + def is_running_as_root(self) -> bool: + """Check if container runs as root.""" + return self._graph.IsRunningAsRoot() + + def get_stages(self): + """Get all build stages.""" + return self._graph.GetStages() + + def is_multi_stage(self) -> bool: + """Check if Dockerfile uses multi-stage build.""" + return self._graph.IsMultiStage() + + def get_stage_by_alias(self, alias: str): + """Get a stage by its AS alias.""" + return self._graph.GetStageByAlias(alias) + + def get_final_stage(self): + """Get the final build stage.""" + return self._graph.GetFinalStage() + + +class ComposeAccess: + """ + Provides programmatic access to docker-compose structure. + Used in custom validation functions. + """ + + def __init__(self, compose_graph): + self._graph = compose_graph + + def get_services(self): + """Get all service names.""" + return self._graph.GetServices() + + def service_has(self, service_name: str, key: str, value) -> bool: + """Check if service has property with value.""" + return self._graph.ServiceHas(service_name, key, value) + + def service_get(self, service_name: str, key: str): + """Get service property value.""" + return self._graph.ServiceGet(service_name, key) + + def get_privileged_services(self): + """Get services with privileged: true.""" + return self._graph.GetPrivilegedServices() + + def services_with_docker_socket(self): + """Get services mounting Docker socket.""" + return self._graph.ServicesWithDockerSocket() + + def services_with_host_network(self): + """Get services using host network mode.""" + return self._graph.ServicesWithHostNetwork() diff --git a/python-dsl/tests/test_container_combinators.py b/python-dsl/tests/test_container_combinators.py new file mode 100644 index 00000000..d04c7c9a --- /dev/null +++ b/python-dsl/tests/test_container_combinators.py @@ -0,0 +1,209 @@ +"""Tests for logic combinators.""" + +from rules.container_matchers import instruction, missing +from rules.container_combinators import ( + all_of, + any_of, + none_of, + instruction_after, + instruction_before, + stage, + final_stage_has, +) + + +class TestAllOf: + def test_basic(self): + m = all_of( + instruction(type="FROM", image_tag="latest"), + missing(instruction="USER") + ) + d = m.to_dict() + assert d["type"] == "all_of" + assert len(d["conditions"]) == 2 + + def test_nested(self): + m = all_of( + any_of( + instruction(type="USER", user_name="root"), + missing(instruction="USER") + ), + instruction(type="FROM") + ) + d = m.to_dict() + assert d["conditions"][0]["type"] == "any_of" + + def test_multiple_conditions(self): + m = all_of( + instruction(type="FROM", image_tag="latest"), + missing(instruction="USER"), + instruction(type="RUN", contains="sudo") + ) + d = m.to_dict() + assert len(d["conditions"]) == 3 + + def test_with_dict(self): + m = all_of( + {"type": "custom", "value": "test"}, + instruction(type="FROM") + ) + d = m.to_dict() + assert d["conditions"][0]["type"] == "custom" + + def test_with_callable(self): + def custom_func(): + return True + + m = all_of( + custom_func, + instruction(type="FROM") + ) + d = m.to_dict() + assert d["conditions"][0]["type"] == "custom_function" + assert d["conditions"][0]["has_callable"] is True + + +class TestAnyOf: + def test_basic(self): + m = any_of( + instruction(type="FROM", image_tag="latest"), + instruction(type="FROM", base_image="scratch") + ) + d = m.to_dict() + assert d["type"] == "any_of" + assert len(d["conditions"]) == 2 + + def test_single_condition(self): + m = any_of( + instruction(type="USER", user_name="root") + ) + d = m.to_dict() + assert len(d["conditions"]) == 1 + + def test_many_conditions(self): + m = any_of( + instruction(type="USER", user_name="root"), + missing(instruction="USER"), + instruction(type="FROM", base_image="scratch"), + instruction(type="RUN", contains="sudo") + ) + d = m.to_dict() + assert len(d["conditions"]) == 4 + + +class TestNoneOf: + def test_basic(self): + m = none_of( + instruction(type="HEALTHCHECK") + ) + d = m.to_dict() + assert d["type"] == "none_of" + + def test_multiple_conditions(self): + m = none_of( + instruction(type="HEALTHCHECK"), + instruction(type="USER") + ) + d = m.to_dict() + assert len(d["conditions"]) == 2 + + +class TestInstructionSequence: + def test_after_string(self): + m = instruction_after(instruction="CMD", after="USER") + d = m.to_dict() + assert d["type"] == "instruction_after" + assert d["instruction"]["instruction"] == "CMD" + assert d["reference"]["instruction"] == "USER" + + def test_before_string(self): + m = instruction_before(instruction="USER", before="CMD") + d = m.to_dict() + assert d["type"] == "instruction_before" + + def test_with_matcher(self): + m = instruction_after( + instruction=instruction(type="RUN", contains="apt-get install"), + after=instruction(type="RUN", contains="apt-get update") + ) + d = m.to_dict() + assert "contains" in d["instruction"] + assert "contains" in d["reference"] + + def test_not_followed_by(self): + m = instruction_after( + instruction="RUN", + after="FROM", + not_followed_by=True + ) + d = m.to_dict() + assert d["not_followed_by"] is True + + def test_before_with_matcher(self): + m = instruction_before( + instruction=instruction(type="USER", user_name="root"), + before="CMD" + ) + d = m.to_dict() + assert d["instruction"]["user_name"] == "root" + + def test_with_dict(self): + m = instruction_after( + instruction={"type": "custom"}, + after={"type": "other"} + ) + d = m.to_dict() + assert d["instruction"]["type"] == "custom" + assert d["reference"]["type"] == "other" + + +class TestStageMatcher: + def test_stage_alias(self): + m = stage(alias="builder") + d = m.to_dict() + assert d["type"] == "stage_query" + assert d["alias"] == "builder" + + def test_final_stage(self): + m = stage(is_final=True) + d = m.to_dict() + assert d["is_final"] is True + + def test_stage_base_image(self): + m = stage(base_image="alpine") + d = m.to_dict() + assert d["base_image"] == "alpine" + + def test_all_params(self): + m = stage(alias="builder", base_image="alpine", is_final=False) + d = m.to_dict() + assert d["alias"] == "builder" + assert d["base_image"] == "alpine" + assert d["is_final"] is False + + def test_final_stage_has_missing(self): + m = final_stage_has(missing_instruction="USER") + d = m.to_dict() + assert d["type"] == "stage_final_has" + assert d["missing_instruction"] == "USER" + + def test_final_stage_has_instruction_string(self): + m = final_stage_has(instruction="USER") + d = m.to_dict() + assert d["instruction"] == "USER" + + def test_final_stage_has_instruction_matcher(self): + m = final_stage_has( + instruction=instruction(type="USER", user_name="root") + ) + d = m.to_dict() + assert d["instruction"]["user_name"] == "root" + + def test_final_stage_has_both_params(self): + m = final_stage_has( + instruction=instruction(type="RUN"), + missing_instruction="HEALTHCHECK" + ) + d = m.to_dict() + assert "instruction" in d + assert d["missing_instruction"] == "HEALTHCHECK" diff --git a/python-dsl/tests/test_container_programmatic.py b/python-dsl/tests/test_container_programmatic.py new file mode 100644 index 00000000..f434c5ab --- /dev/null +++ b/python-dsl/tests/test_container_programmatic.py @@ -0,0 +1,211 @@ +"""Tests for programmatic access.""" + +from rules.container_programmatic import ( + custom_check, + ProgrammaticMatcher, + DockerfileAccess, + ComposeAccess, +) + + +class TestCustomCheck: + def test_basic(self): + def my_check(dockerfile): + return dockerfile.is_running_as_root() + + m = custom_check(my_check, "Check for root user") + d = m.to_dict() + assert d["type"] == "programmatic" + assert d["has_callable"] is True + assert d["description"] == "Check for root user" + + def test_callable_stored(self): + def my_check(dockerfile): + return True + + m = custom_check(my_check) + assert callable(m.check_function) + + def test_without_description(self): + def my_check(compose): + return len(compose.get_privileged_services()) > 0 + + m = custom_check(my_check) + d = m.to_dict() + assert d["description"] == "" + + def test_complex_function(self): + def check_multi_stage(dockerfile): + if not dockerfile.is_multi_stage(): + return False + final = dockerfile.get_final_stage() + return final.user_name == "root" + + m = custom_check(check_multi_stage, "Multi-stage runs as root") + assert m.check_function is check_multi_stage + assert m.description == "Multi-stage runs as root" + + +class TestProgrammaticMatcher: + def test_creation(self): + def func(): + return True + + pm = ProgrammaticMatcher(check_function=func, description="Test") + assert pm.check_function is func + assert pm.description == "Test" + + def test_to_dict(self): + pm = ProgrammaticMatcher( + check_function=lambda x: True, + description="Lambda test" + ) + d = pm.to_dict() + assert d["type"] == "programmatic" + assert d["has_callable"] is True + assert d["description"] == "Lambda test" + + +class MockDockerfileGraph: + """Mock for testing DockerfileAccess.""" + + def __init__(self): + self.instructions = {"USER": [{"user_name": "root"}]} + + def GetInstructions(self, instruction_type): + return self.instructions.get(instruction_type, []) + + def HasInstruction(self, instruction_type): + return instruction_type in self.instructions + + def GetFinalUser(self): + return {"user_name": "root"} + + def IsRunningAsRoot(self): + return True + + def GetStages(self): + return [{"alias": "builder"}] + + def IsMultiStage(self): + return True + + def GetStageByAlias(self, alias): + return {"alias": alias} + + def GetFinalStage(self): + return {"alias": "final"} + + +class TestDockerfileAccess: + def test_get_instructions(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + result = access.get_instructions("USER") + assert len(result) == 1 + + def test_has_instruction(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + assert access.has_instruction("USER") is True + assert access.has_instruction("HEALTHCHECK") is False + + def test_get_final_user(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + user = access.get_final_user() + assert user["user_name"] == "root" + + def test_is_running_as_root(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + assert access.is_running_as_root() is True + + def test_get_stages(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + stages = access.get_stages() + assert len(stages) == 1 + + def test_is_multi_stage(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + assert access.is_multi_stage() is True + + def test_get_stage_by_alias(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + stage = access.get_stage_by_alias("builder") + assert stage["alias"] == "builder" + + def test_get_final_stage(self): + graph = MockDockerfileGraph() + access = DockerfileAccess(graph) + stage = access.get_final_stage() + assert stage["alias"] == "final" + + +class MockComposeGraph: + """Mock for testing ComposeAccess.""" + + def __init__(self): + self.services = ["web", "db"] + + def GetServices(self): + return self.services + + def ServiceHas(self, service_name, key, value): + return service_name == "web" and key == "privileged" and value is True + + def ServiceGet(self, service_name, key): + if service_name == "web" and key == "image": + return "nginx" + return None + + def GetPrivilegedServices(self): + return ["web"] + + def ServicesWithDockerSocket(self): + return ["db"] + + def ServicesWithHostNetwork(self): + return [] + + +class TestComposeAccess: + def test_get_services(self): + graph = MockComposeGraph() + access = ComposeAccess(graph) + services = access.get_services() + assert len(services) == 2 + assert "web" in services + + def test_service_has(self): + graph = MockComposeGraph() + access = ComposeAccess(graph) + assert access.service_has("web", "privileged", True) is True + assert access.service_has("db", "privileged", True) is False + + def test_service_get(self): + graph = MockComposeGraph() + access = ComposeAccess(graph) + image = access.service_get("web", "image") + assert image == "nginx" + + def test_get_privileged_services(self): + graph = MockComposeGraph() + access = ComposeAccess(graph) + privileged = access.get_privileged_services() + assert privileged == ["web"] + + def test_services_with_docker_socket(self): + graph = MockComposeGraph() + access = ComposeAccess(graph) + with_socket = access.services_with_docker_socket() + assert with_socket == ["db"] + + def test_services_with_host_network(self): + graph = MockComposeGraph() + access = ComposeAccess(graph) + host_network = access.services_with_host_network() + assert len(host_network) == 0