Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions datajunction-server/datajunction_server/database/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from typing import TYPE_CHECKING, List, Optional, Tuple

from sqlalchemy import BigInteger, ForeignKey, Index, Integer, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship

from datajunction_server.database.attributetype import ColumnAttribute
from datajunction_server.database.base import Base
from datajunction_server.models.attribute import ColumnAttributes
from datajunction_server.models.base import labelize
from datajunction_server.models.column import ColumnTypeDecorator
from datajunction_server.models.unit import Unit, UnitTypeDecorator
from datajunction_server.sql.parsing.types import ColumnType

if TYPE_CHECKING:
Expand Down Expand Up @@ -46,9 +46,13 @@ class Column(Base): # type: ignore
type: Mapped[Optional[ColumnType]] = mapped_column(ColumnTypeDecorator)
description: Mapped[Optional[str]] = mapped_column()

# Structured unit metadata. JSONB to accommodate atomic and compound shapes.
# Validated at the model layer via datajunction_server.models.unit.Unit.
unit: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
# Structured unit metadata. The TypeDecorator validates JSONB into a
# typed `Unit` (AtomicUnit | CompoundUnit) on read and canonicalizes
# Unit-or-dict input back to JSONB on write.
unit: Mapped[Optional[Unit]] = mapped_column(
UnitTypeDecorator,
nullable=True,
)

dimension_id: Mapped[Optional[int]] = mapped_column(
ForeignKey("node.id", ondelete="SET NULL", name="fk_column_dimension_id_node"),
Expand Down
67 changes: 61 additions & 6 deletions datajunction-server/datajunction_server/database/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from datajunction_server.models.node import (
DEFAULT_DRAFT_VERSION,
BuildCriteria,
MetricUnit,
NodeCursor,
NodeMode,
NodeStatus,
Expand Down Expand Up @@ -174,6 +175,44 @@ def _build_search_score(
return relevance * branch_boost * popularity


def _resolve_metric_unit_for_spec(
col_unit: "Any | None",
legacy_from_md: "Any | None",
) -> "Tuple[Any | None, dict | None]":
"""
Decide which of (legacy enum, structured dict) to populate on a MetricSpec
when round-tripping a metric back from the DB.

Accepts `col_unit` as either a `Unit` Pydantic instance (the typed shape
returned by `UnitTypeDecorator`) or a plain dict (in-memory test
fixtures, or untyped paths). Internally normalizes via `unit_to_dict`.

Rules (preserves authoring intent on round-trip):
- No structured `column.unit` → emit only the legacy field (whatever was
in metric_metadata.unit), structured stays None.
- Structured `column.unit` is legacy-expressible (USD, percentage, time
codes, etc.) → keep the legacy field as authoritative so `unit: dollar`
round-trips as `unit: dollar`, not `unit: {kind: currency, code: USD}`.
Structured stays None.
- Structured `column.unit` is NOT legacy-expressible (EUR, compound,
count-with-code, data_size) → populate structured, null the legacy so
nothing tries to dual-emit.

Returns (legacy_for_spec, structured_for_spec_dict).
"""
from datajunction_server.models.unit import (
structured_to_legacy_unit_name,
unit_to_dict,
)

col_unit_dict = unit_to_dict(col_unit)
if col_unit_dict is None:
return legacy_from_md, None
if structured_to_legacy_unit_name(col_unit_dict) is None:
return None, col_unit_dict
return legacy_from_md, None


class NodeRelationship(Base):
"""
Join table for self-referential many-to-many relationships between nodes.
Expand Down Expand Up @@ -545,19 +584,35 @@ async def to_spec(self, session: AsyncSession) -> NodeSpec:

# Metric-specific
if self.type == NodeType.METRIC:
col_unit = (
self.current.columns[0].unit
if self.current.columns and self.current.columns[0].unit
else None
)
# `MetricUnit.UNKNOWN` is the DB-column default for any metric
# that never had a unit authored. Treat it as "no unit" so it
# doesn't leak as `unit: unknown` on YAML re-export.
legacy_from_md = (
self.current.metric_metadata.unit
if self.current.metric_metadata
and self.current.metric_metadata.unit
and self.current.metric_metadata.unit != MetricUnit.UNKNOWN
else None
)
legacy_spec, structured_spec = _resolve_metric_unit_for_spec(
col_unit,
legacy_from_md,
)

extra_kwargs.update(
required_dimensions=sorted(
col.name for col in self.current.required_dimensions
),
direction=self.current.metric_metadata.direction
if self.current.metric_metadata
else None,
unit_enum=(
self.current.metric_metadata.unit
if self.current.metric_metadata
and self.current.metric_metadata.unit
else None
),
unit_enum=legacy_spec,
unit_structured=structured_spec,
significant_digits=self.current.metric_metadata.significant_digits
if self.current.metric_metadata
else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@
from datajunction_server.models.node import (
DEFAULT_DRAFT_VERSION,
DEFAULT_PUBLISHED_VERSION,
MetricUnit,
NodeMode,
NodeStatus,
NodeType,
)
from datajunction_server.models.unit import (
legacy_unit_to_structured,
structured_to_legacy_unit_name,
unit_to_dict,
)
from datajunction_server.utils import (
SEPARATOR,
Version,
Expand Down Expand Up @@ -3518,15 +3524,31 @@ async def _create_node_revision(
metric_spec = cast(MetricSpec, result.spec)
if new_revision.columns: # pragma: no branch
new_revision.columns[0].display_name = new_revision.display_name

# Bridge legacy metric_metadata.unit and structured
# columns[0].unit so users on either input shape end up with
# both fields populated where expressible. See
# `_resolve_metric_unit` for the full precedence rules.
output_col = new_revision.columns[0] if new_revision.columns else None
if output_col is not None:
output_col.unit = self._resolve_metric_unit(
metric_spec,
output_col.unit,
)

legacy_unit = self._derive_legacy_unit_for_storage(
metric_spec,
output_col,
)
if (
metric_spec.unit_enum
legacy_unit
or metric_spec.direction
or metric_spec.significant_digits
or metric_spec.max_decimal_exponent
or metric_spec.min_decimal_exponent
):
new_revision.metric_metadata = MetricMetadata(
unit=metric_spec.unit_enum,
unit=legacy_unit,
direction=metric_spec.direction,
significant_digits=metric_spec.significant_digits,
max_decimal_exponent=metric_spec.max_decimal_exponent,
Expand All @@ -3546,6 +3568,101 @@ async def _create_node_revision(
new_revision.required_dimensions = required_dimensions
return new_revision

def _resolve_metric_unit(
self,
metric_spec: MetricSpec,
column_unit: dict | None,
) -> dict | None:
"""
Compute the canonical structured unit dict for a metric's output
column, reconciling the three possible input surfaces. Pure:
returns the value the caller should assign; no mutation.

Input surfaces (highest priority first):
1. `metric_spec.unit_structured` — top-level structured `unit:`
at the metric spec level. Authored shape for the new model.
2. `column_unit` — structured value set via the explicit
`columns[<output>].unit` form. Supported for uniformity with
non-metric nodes, but unusual on metrics (where the column
name is auto-derived).
3. `metric_spec.unit_enum` — legacy flat-string `unit: dollar`
form. Translated via the legacy → structured table.

Conflict handling: if (1) is set together with (2) or (3) and the
values disagree, a warning is logged naming the node.

Returns the canonical dict shape (None if no unit is set on any
surface).
"""
legacy = metric_spec.unit_enum
spec_structured = unit_to_dict(metric_spec.unit_structured)
# column.unit comes from JSONB as a plain dict; canonicalize so a
# hand-rolled or legacy-translated value compares equal to the
# spec_structured shape regardless of code: None presence.
column_structured = unit_to_dict(column_unit)

# (1) Metric-level structured input wins absolutely.
if spec_structured is not None:
if column_structured is not None and column_structured != spec_structured:
logger.warning(
"Metric %s sets a structured unit at both the metric "
"spec level (%r) and on columns[].unit (%r); the "
"metric-level value wins. Remove one to silence this.",
metric_spec.rendered_name,
spec_structured,
column_structured,
)
if legacy is not None and legacy != MetricUnit.UNKNOWN:
logger.warning(
"Metric %s sets both a structured unit (%r) and the "
"legacy unit_enum (%s); structured value wins.",
metric_spec.rendered_name,
spec_structured,
legacy.name,
)
return spec_structured

# (2) columns[].unit fallback.
if column_structured is not None:
if legacy is not None and legacy != MetricUnit.UNKNOWN:
logger.warning(
"Metric %s sets both metric_metadata.unit (%s) and "
"columns[].unit (%r); structured value wins.",
metric_spec.rendered_name,
legacy.name,
column_structured,
)
return column_structured

# (3) Legacy translation.
return legacy_unit_to_structured(legacy)

def _derive_legacy_unit_for_storage(
self,
metric_spec: MetricSpec,
output_col: Column | None,
) -> MetricUnit | None:
"""
Compute the value to write to the legacy `metricmetadata.unit` DB
column given the canonical structured `output_col.unit`. Dual-writing
the legacy column keeps it in sync with `column.unit` so:
- API consumers reading `metric_metadata.unit` keep seeing values.
- A code rollback to a release that reads only the legacy column
still finds the right data for everything the enum can express.

Returns None when the structured value has no legacy equivalent
(non-USD currencies, compound units, data sizes, count with code) —
the caller writes NULL to `metricmetadata.unit` in that case.
"""
structured = output_col.unit if output_col is not None else None
if structured is not None:
name = structured_to_legacy_unit_name(structured)
return MetricUnit[name] if name is not None else None
# No structured unit — fall back to whatever the legacy spec field
# had (typically None at this point because the caller will have
# already assigned the resolved column.unit from _resolve_metric_unit).
return metric_spec.unit_enum

def _create_column_from_spec(
self,
col: ColumnSpec,
Expand All @@ -3558,7 +3675,7 @@ def _create_column_from_spec(
display_name=col.display_name,
description=col.description,
order=order,
unit=col.unit.model_dump() if col.unit is not None else None,
unit=unit_to_dict(col.unit),
attributes=[
ColumnAttribute(
attribute_type=self.registry.attributes.get(attr),
Expand Down
13 changes: 12 additions & 1 deletion datajunction-server/datajunction_server/internal/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,14 @@ def _has_column_customizations(col: dict) -> bool:
"attributes",
[],
)
return has_custom_display or has_attributes or has_description or has_partition
has_unit = bool(col.get("unit"))
return (
has_custom_display
or has_attributes
or has_description
or has_partition
or has_unit
)


def _merge_columns_preserving_comments(existing_list, new_list, is_cube=False):
Expand Down Expand Up @@ -1737,6 +1744,10 @@ def _node_spec_to_yaml_dict(node_spec, include_all_columns=False) -> dict:

# Filter columns to only include meaningful customizations
# Special case for cubes: ALWAYS only export columns with partitions
# Metric specs exclude `columns` from model_dump (`columns: ... exclude=True`),
# so metric YAML never carries per-column entries — the metric's unit is
# emitted at the spec top level via the `unit:` field. No suppression
# needed here for metrics.
# For other nodes: respect include_all_columns flag to preserve comments
if "columns" in data and data["columns"] is not None:
is_cube = data.get("node_type") == "cube"
Expand Down
21 changes: 21 additions & 0 deletions datajunction-server/datajunction_server/internal/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,26 @@ async def create_node_revision(
if node_revision.type == NodeType.METRIC:
if node_revision.columns:
node_revision.columns[0].display_name = node_revision.display_name
# Bridge legacy metric_metadata.unit onto columns[0].unit so this
# endpoint produces the same DB state as the deployment
# orchestrator. Without this, metrics created here have no
# column.unit while the same node copied via branch fast-path
# (which routes through the orchestrator) does — causing spec
# drift that surfaces as cube column inheritance mismatches.
if (
node_revision.metric_metadata
and node_revision.metric_metadata.unit
and node_revision.columns[0].unit is None
):
from datajunction_server.models.unit import (
legacy_unit_to_structured,
)

structured = legacy_unit_to_structured(
node_revision.metric_metadata.unit,
)
if structured is not None:
node_revision.columns[0].unit = structured
node_revision.catalog_id = catalog_id
return node_revision

Expand Down Expand Up @@ -634,6 +654,7 @@ async def create_cube_node_revision(
if referenced_node.type == NodeType.METRIC
else col.display_name,
type=col.type,
unit=col.unit,
attributes=[
ColumnAttribute(attribute_type_id=attr.attribute_type_id)
for attr in col.attributes
Expand Down
Loading
Loading