Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies = [
"trimesh",
"embreex; platform_machine=='x86_64' or platform_machine=='AMD64'",
"onnxruntime==1.18.0",
"pydicom",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -250,6 +251,7 @@ messages_control.disable = [
"duplicate-code",
"cyclic-import",
"unreachable", # This is to allow marking of TODO items with a NotImplementedError
"c-extension-no-member",
]

[tool.setuptools.package-data]
Expand Down
88 changes: 58 additions & 30 deletions src/openlifu/db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@
import logging
import os
import shutil
import tempfile
from enum import Enum
from pathlib import Path
from typing import Dict, List

import h5py
import nibabel as nib

from openlifu.nav.photoscan import Photoscan, load_data_from_photoscan
from openlifu.plan import Protocol, Run, Solution
from openlifu.util.json import PYFUSEncoder
from openlifu.util.types import PathLike
from openlifu.util.volume_conversion import (
convert_dicom_to_nifti,
is_dicom_file_or_directory,
)
from openlifu.xdc import Transducer, TransducerArray
from openlifu.xdc.util import load_transducer_from_file

Expand All @@ -24,6 +30,7 @@

OnConflictOpts = Enum('OnConflictOpts', ['ERROR', 'OVERWRITE', 'SKIP'])


class Database:
def __init__(self, path: str | None = None):
if path is None:
Expand Down Expand Up @@ -378,35 +385,57 @@ def write_volume(self, subject_id, volume_id, volume_name, volume_data_filepath,
if not Path(volume_data_filepath).exists():
raise ValueError(f'Volume data filepath does not exist: {volume_data_filepath}')

volume_ids = self.get_volume_ids(subject_id)
if volume_id in volume_ids:
if on_conflict == OnConflictOpts.ERROR:
raise ValueError(f"Volume with ID {volume_id} already exists for subject {subject_id}.")
elif on_conflict == OnConflictOpts.OVERWRITE:
self.logger.info(f"Overwriting volume with ID {volume_id} for subject {subject_id}.")
elif on_conflict == OnConflictOpts.SKIP:
self.logger.info(f"Skipping volume with ID {volume_id} for subject {subject_id} as it already exists.")
return
else:
raise ValueError("Invalid 'on_conflict' option. Use 'error', 'overwrite', or 'skip'.")

# Create volume metadata
volume_metadata_dict = {"id": volume_id, "name": volume_name, "data_filename": Path(volume_data_filepath).name}
volume_metadata_json = json.dumps(volume_metadata_dict, separators=(',', ':'), cls=PYFUSEncoder)

# Save the volume metadata to a JSON file and copy volume data file to database
volume_metadata_filepath = self.get_volume_metadata_filepath(subject_id, volume_id) #subject_id/volume/volume_id/volume_id.json
Path(volume_metadata_filepath).parent.parent.mkdir(exist_ok=True) # volume directory
Path(volume_metadata_filepath).parent.mkdir(exist_ok=True)
with open(volume_metadata_filepath, 'w') as file:
file.write(volume_metadata_json)
shutil.copy(Path(volume_data_filepath), Path(volume_metadata_filepath).parent)

if volume_id not in volume_ids:
volume_ids.append(volume_id)
self.write_volume_ids(subject_id, volume_ids)

self.logger.info(f"Added volume with ID {volume_id} for subject {subject_id} to the database.")
path = Path(volume_data_filepath)
if path.is_dir() and not is_dicom_file_or_directory(volume_data_filepath):
raise ValueError(f'Volume data filepath is a directory without DICOM files: {volume_data_filepath}')

# convert dicom to nifti if needed
temp_nifti_path = None
if is_dicom_file_or_directory(volume_data_filepath):
self.logger.info(f"Detected DICOM input for volume {volume_id}, converting to NIfTI format")
with tempfile.NamedTemporaryFile(suffix='.nii.gz', delete=False) as temp_file:
temp_nifti_path = Path(temp_file.name)

try:
convert_dicom_to_nifti(volume_data_filepath, temp_nifti_path)
volume_data_filepath = temp_nifti_path
except Exception as e:
if temp_nifti_path.exists():
temp_nifti_path.unlink()
raise RuntimeError(f"Failed to convert DICOM to NIfTI: {e}") from e

try:
volume_ids = self.get_volume_ids(subject_id)
if volume_id in volume_ids:
if on_conflict == OnConflictOpts.ERROR:
raise ValueError(f"Volume with ID {volume_id} already exists for subject {subject_id}.")
elif on_conflict == OnConflictOpts.OVERWRITE:
self.logger.info(f"Overwriting volume with ID {volume_id} for subject {subject_id}.")
elif on_conflict == OnConflictOpts.SKIP:
self.logger.info(f"Skipping volume with ID {volume_id} for subject {subject_id} as it already exists.")
return
else:
raise ValueError("Invalid 'on_conflict' option. Use 'error', 'overwrite', or 'skip'.")

volume_metadata_dict = {"id": volume_id, "name": volume_name, "data_filename": Path(volume_data_filepath).name}
volume_metadata_json = json.dumps(volume_metadata_dict, separators=(',', ':'), cls=PYFUSEncoder)

volume_metadata_filepath = self.get_volume_metadata_filepath(subject_id, volume_id)
Path(volume_metadata_filepath).parent.parent.mkdir(exist_ok=True)
Path(volume_metadata_filepath).parent.mkdir(exist_ok=True)
with open(volume_metadata_filepath, 'w') as file:
file.write(volume_metadata_json)
shutil.copy(Path(volume_data_filepath), Path(volume_metadata_filepath).parent)

if volume_id not in volume_ids:
volume_ids.append(volume_id)
self.write_volume_ids(subject_id, volume_ids)

self.logger.info(f"Added volume with ID {volume_id} for subject {subject_id} to the database.")
finally:
# cleanup temp nifti file
if temp_nifti_path is not None and temp_nifti_path.exists():
temp_nifti_path.unlink()

def write_photocollection(self, subject_id, session_id, reference_number: str, photo_paths: List[PathLike], on_conflict=OnConflictOpts.ERROR):
""" Writes a photocollection to database and copies the associated
Expand Down Expand Up @@ -980,7 +1009,6 @@ def load_volume(self, subject, volume_id):
if not volume_data_filepath.exists() or not volume_data_filepath.is_file():
self.logger.error(f"Volume data file not found for volume {volume_id}, subject {subject.id}")
raise FileNotFoundError(f"Volume data file not found for volume {volume_id}, subject {subject.id}")
import nibabel as nib
# Load the volume data using nibabel
volume_data = nib.load(volume_data_filepath)
self.logger.info(f"Loaded volume {volume_id} for subject {subject.id}")
Expand Down
160 changes: 160 additions & 0 deletions src/openlifu/util/volume_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Utilities for converting between different medical imaging formats."""

from __future__ import annotations

from pathlib import Path

import nibabel as nib
import numpy as np
import pydicom

from openlifu.util.types import PathLike


def is_dicom_file_or_directory(path: PathLike) -> bool:
"""
Check if a path is a DICOM file or directory containing DICOM files.

Args:
path: Path to check

Returns:
True if path is a DICOM file or directory with DICOM files, False otherwise
"""
path = Path(path)

if path.is_file():
# check for 'DICM' magic bytes at offset 128
try:
with open(path, 'rb') as f:
f.seek(128)
return f.read(4) == b'DICM'
except OSError:
return False

elif path.is_dir():
for file in path.iterdir():
if file.is_file():
try:
with open(file, 'rb') as f:
f.seek(128)
if f.read(4) == b'DICM':
return True
except OSError:
continue

return False


def extract_affine_from_dicom(
dicom_slices: list[tuple[int, np.ndarray, pydicom.Dataset]]
) -> np.ndarray:
"""
Extract the affine transformation matrix from DICOM header information.
Converts from DICOM LPS (Left-Posterior-Superior) to NIfTI RAS.

Args:
dicom_slices: List of tuples (instance_number, pixel_array, header) where
header is a pydicom.Dataset containing DICOM metadata tags

Returns:
4x4 affine transformation matrix mapping voxel coordinates to RAS world coordinates

Raises:
RuntimeError: If required DICOM tags are missing
"""
# use the first slice to extract most parameters
first_header = dicom_slices[0][2]

try:
# ImageOrientationPatient (0020,0037): direction cosines for row and column
orientation = np.array(first_header.ImageOrientationPatient, dtype=float)
row_cosine = orientation[:3] # direction cosines for rows
col_cosine = orientation[3:] # direction cosines for columns

# ImagePositionPatient (0020,0032): position of the upper-left voxel
position = np.array(first_header.ImagePositionPatient, dtype=float)

# PixelSpacing is [row_spacing, col_spacing], so map to dy, dx
dy, dx = np.array(first_header.PixelSpacing, dtype=float)

except AttributeError as e:
raise RuntimeError(
f"Missing required DICOM tag for affine calculation: {e}"
) from e

# Compute Z direction and spacing (handling potential gantry tilt)
slice_cosine = np.cross(row_cosine, col_cosine)

# calculate slice spacing
if len(dicom_slices) > 1:
# calculate from the distance between first two slices
first_pos = np.array(dicom_slices[0][2].ImagePositionPatient, dtype=float)
second_pos = np.array(dicom_slices[1][2].ImagePositionPatient, dtype=float)
dz = np.dot(second_pos - first_pos, slice_cosine)
else:
# single slice - try to get from SliceThickness or default to 1.0
dz = float(getattr(first_header, 'SliceThickness', 1.0))

# Construct affine in DICOM LPS space
affine = np.eye(4)
affine[:3, 0] = row_cosine * dx
affine[:3, 1] = col_cosine * dy
affine[:3, 2] = slice_cosine * dz
affine[:3, 3] = position

# Convert LPS to RAS by flipping X and Y axes
return np.diag([-1, -1, 1, 1]) @ affine


def convert_dicom_to_nifti(input_path: PathLike, output_filepath: PathLike) -> None:
"""
Convert DICOM file(s) to NIfTI format using pydicom and nibabel.

Args:
input_path: Path to either a DICOM file or directory containing DICOM files
output_filepath: Path where the output NIfTI file should be saved

Raises:
RuntimeError: If the conversion fails
"""
input_path = Path(input_path)
output_filepath = Path(output_filepath)

try:
if input_path.is_file():
dicom_files = [input_path]
else:
# dicom files may not have .dcm extension
dicom_files = [f for f in input_path.iterdir() if f.is_file()]

if not dicom_files:
raise RuntimeError("No DICOM files found")

slices = []
for dcm_file in dicom_files:
try:
header = pydicom.dcmread(dcm_file)
# Transpose to swap (Row, Col) -> (X, Y) for NIfTI
slices.append((header.get('InstanceNumber', 0), header.pixel_array.T, header))
except Exception:
# skip files that aren't valid dicom
continue

if not slices:
raise RuntimeError("No valid DICOM files found")

# sort by instance number - this is the slice order in the series
# so we reconstruct the 3D volume in the right order
slices.sort(key=lambda x: x[0])

# stack into 3D volume (handles both single and multiple slices)
volume = np.stack([s[1] for s in slices], axis=-1)

# extract affine from DICOM headers
affine = extract_affine_from_dicom(slices)

nib.save(nib.Nifti1Image(volume, affine), str(output_filepath))

except Exception as e:
raise RuntimeError(f"DICOM to NIfTI conversion failed: {e}") from e
Binary file added tests/resources/CT_small.dcm
Binary file not shown.
Binary file added tests/resources/dicom_series/6293
Binary file not shown.
Binary file added tests/resources/dicom_series/6924
Binary file not shown.
64 changes: 64 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from openlifu.geo import ArrayTransform, Point
from openlifu.nav.photoscan import Photoscan
from openlifu.plan import Protocol, Run
from openlifu.util.volume_conversion import is_dicom_file_or_directory
from openlifu.xdc import Transducer


Expand Down Expand Up @@ -743,3 +744,66 @@ def test_get_transducer_absolute_filepaths(example_database, tmp_path: Path, reg
assert reconstructed_path.name == transducer_body.name
else:
assert absolute_file_paths["transducer_body_abspath"] is None

def test_write_volume_dicom(example_database: Database):
"""Test writing a volume from DICOM file - conversion to NIfTI and storage"""
subject_id = "example_subject"
volume_id = "test_dicom_volume"
volume_name = "TEST_DICOM_VOLUME"

test_dicom_file = Path(__file__).parent / "resources" / "CT_small.dcm"
assert test_dicom_file.exists(), "CT_small.dcm test file should exist"

example_database.write_volume(subject_id, volume_id, volume_name, test_dicom_file)

volume_ids = example_database.get_volume_ids(subject_id)
assert volume_id in volume_ids

volume_metadata_filepath = example_database.get_volume_metadata_filepath(subject_id, volume_id)
assert volume_metadata_filepath.exists()
assert volume_metadata_filepath.name == f"{volume_id}.json"

# verify stored as nifti not dicom
volume_info = example_database.get_volume_info(subject_id, volume_id)
stored_file = Path(volume_info["data_abspath"])
assert stored_file.exists()
assert stored_file.suffix in [".nii", ".gz"] # .nii.gz suffix
assert not is_dicom_file_or_directory(stored_file)

def test_write_volume_dicom_directory(example_database: Database):
"""Test writing a volume from DICOM directory (multi-slice series) - conversion to NIfTI and storage"""
subject_id = "example_subject"
volume_id = "test_dicom_series_volume"
volume_name = "TEST_DICOM_SERIES_VOLUME"

test_dicom_dir = Path(__file__).parent / "resources" / "dicom_series"
assert test_dicom_dir.exists(), "dicom_series directory should exist"
assert test_dicom_dir.is_dir()
assert len(list(test_dicom_dir.iterdir())) > 0, "dicom_series should contain files"

example_database.write_volume(subject_id, volume_id, volume_name, test_dicom_dir)

volume_ids = example_database.get_volume_ids(subject_id)
assert volume_id in volume_ids

volume_metadata_filepath = example_database.get_volume_metadata_filepath(subject_id, volume_id)
assert volume_metadata_filepath.exists()
assert volume_metadata_filepath.name == f"{volume_id}.json"

# verify stored as nifti not dicom
volume_info = example_database.get_volume_info(subject_id, volume_id)
stored_file = Path(volume_info["data_abspath"])
assert stored_file.exists()
assert stored_file.suffix in [".nii", ".gz"] # .nii.gz suffix
assert not is_dicom_file_or_directory(stored_file)

def test_write_volume_empty_directory(example_database: Database, tmp_path: Path):
subject_id = "example_subject"
volume_id = "test_empty_dir_volume"
volume_name = "TEST_EMPTY_DIR_VOLUME"

empty_dir = tmp_path / "empty_dir"
empty_dir.mkdir()

with pytest.raises(ValueError, match="directory without DICOM files"):
example_database.write_volume(subject_id, volume_id, volume_name, empty_dir)
Loading