forked from Project-MONAI/MONAI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharray.py
More file actions
1176 lines (941 loc) · 44.3 KB
/
array.py
File metadata and controls
1176 lines (941 loc) · 44.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2020 - 2021 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A collection of "vanilla" transforms for utility functions
https://github.com/Project-MONAI/MONAI/wiki/MONAI_Design
"""
import logging
import sys
import time
import warnings
from typing import TYPE_CHECKING, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union
import numpy as np
import torch
from monai.config import DtypeLike
from monai.config.type_definitions import NdarrayOrTensor
from monai.transforms.transform import Randomizable, RandomizableTransform, Transform
from monai.transforms.utils import (
extreme_points_to_image,
get_extreme_points,
map_binary_to_indices,
map_classes_to_indices,
)
from monai.transforms.utils_pytorch_numpy_unification import in1d, moveaxis
from monai.utils import convert_to_numpy, convert_to_tensor, ensure_tuple, look_up_option, min_version, optional_import
from monai.utils.enums import TransformBackends
from monai.utils.misc import is_module_ver_at_least
from monai.utils.type_conversion import convert_data_type
PILImageImage, has_pil = optional_import("PIL.Image", name="Image")
pil_image_fromarray, _ = optional_import("PIL.Image", name="fromarray")
cp, has_cp = optional_import("cupy")
if TYPE_CHECKING:
from cupy import ndarray as cp_ndarray
else:
cp_ndarray, _ = optional_import("cupy", name="ndarray")
__all__ = [
"Identity",
"AsChannelFirst",
"AsChannelLast",
"AddChannel",
"EnsureChannelFirst",
"EnsureType",
"RepeatChannel",
"RemoveRepeatedChannel",
"SplitChannel",
"CastToType",
"ToTensor",
"ToNumpy",
"ToPIL",
"Transpose",
"SqueezeDim",
"DataStats",
"SimulateDelay",
"Lambda",
"RandLambda",
"LabelToMask",
"FgBgToIndices",
"ClassesToIndices",
"ConvertToMultiChannelBasedOnBratsClasses",
"AddExtremePointsChannel",
"TorchVision",
"MapLabelValue",
"IntensityStats",
"ToDevice",
]
class Identity(Transform):
"""
Do nothing to the data.
As the output value is same as input, it can be used as a testing tool to verify the transform chain,
Compose or transform adaptor, etc.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`.
"""
return img
class AsChannelFirst(Transform):
"""
Change the channel dimension of the image to the first dimension.
Most of the image transformations in ``monai.transforms``
assume the input image is in the channel-first format, which has the shape
(num_channels, spatial_dim_1[, spatial_dim_2, ...]).
This transform could be used to convert, for example, a channel-last image array in shape
(spatial_dim_1[, spatial_dim_2, ...], num_channels) into the channel-first format,
so that the multidimensional image array can be correctly interpreted by the other transforms.
Args:
channel_dim: which dimension of input image is the channel, default is the last dimension.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, channel_dim: int = -1) -> None:
if not (isinstance(channel_dim, int) and channel_dim >= -1):
raise AssertionError("invalid channel dimension.")
self.channel_dim = channel_dim
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`.
"""
return moveaxis(img, self.channel_dim, 0)
class AsChannelLast(Transform):
"""
Change the channel dimension of the image to the last dimension.
Some of other 3rd party transforms assume the input image is in the channel-last format with shape
(spatial_dim_1[, spatial_dim_2, ...], num_channels).
This transform could be used to convert, for example, a channel-first image array in shape
(num_channels, spatial_dim_1[, spatial_dim_2, ...]) into the channel-last format,
so that MONAI transforms can construct a chain with other 3rd party transforms together.
Args:
channel_dim: which dimension of input image is the channel, default is the first dimension.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, channel_dim: int = 0) -> None:
if not (isinstance(channel_dim, int) and channel_dim >= -1):
raise AssertionError("invalid channel dimension.")
self.channel_dim = channel_dim
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`.
"""
return moveaxis(img, self.channel_dim, -1)
class AddChannel(Transform):
"""
Adds a 1-length channel dimension to the input image.
Most of the image transformations in ``monai.transforms``
assumes the input image is in the channel-first format, which has the shape
(num_channels, spatial_dim_1[, spatial_dim_2, ...]).
This transform could be used, for example, to convert a (spatial_dim_1[, spatial_dim_2, ...])
spatial image into the channel-first format so that the
multidimensional image array can be correctly interpreted by the other
transforms.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`.
"""
return img[None]
class EnsureChannelFirst(Transform):
"""
Automatically adjust or add the channel dimension of input data to ensure `channel_first` shape.
It extracts the `original_channel_dim` info from provided meta_data dictionary.
Typical values of `original_channel_dim` can be: "no_channel", 0, -1.
Convert the data to `channel_first` based on the `original_channel_dim` information.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, strict_check: bool = True):
"""
Args:
strict_check: whether to raise an error when the meta information is insufficient.
"""
self.strict_check = strict_check
def __call__(self, img: NdarrayOrTensor, meta_dict: Optional[Mapping] = None) -> NdarrayOrTensor:
"""
Apply the transform to `img`.
"""
if not isinstance(meta_dict, Mapping):
msg = "meta_dict not available, EnsureChannelFirst is not in use."
if self.strict_check:
raise ValueError(msg)
warnings.warn(msg)
return img
channel_dim = meta_dict.get("original_channel_dim")
if channel_dim is None:
msg = "Unknown original_channel_dim in the meta_dict, EnsureChannelFirst is not in use."
if self.strict_check:
raise ValueError(msg)
warnings.warn(msg)
return img
if channel_dim == "no_channel":
return AddChannel()(img)
return AsChannelFirst(channel_dim=channel_dim)(img)
class RepeatChannel(Transform):
"""
Repeat channel data to construct expected input shape for models.
The `repeats` count includes the origin data, for example:
``RepeatChannel(repeats=2)([[1, 2], [3, 4]])`` generates: ``[[1, 2], [1, 2], [3, 4], [3, 4]]``
Args:
repeats: the number of repetitions for each element.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, repeats: int) -> None:
if repeats <= 0:
raise AssertionError("repeats count must be greater than 0.")
self.repeats = repeats
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`, assuming `img` is a "channel-first" array.
"""
repeeat_fn = torch.repeat_interleave if isinstance(img, torch.Tensor) else np.repeat
return repeeat_fn(img, self.repeats, 0) # type: ignore
class RemoveRepeatedChannel(Transform):
"""
RemoveRepeatedChannel data to undo RepeatChannel
The `repeats` count specifies the deletion of the origin data, for example:
``RemoveRepeatedChannel(repeats=2)([[1, 2], [1, 2], [3, 4], [3, 4]])`` generates: ``[[1, 2], [3, 4]]``
Args:
repeats: the number of repetitions to be deleted for each element.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, repeats: int) -> None:
if repeats <= 0:
raise AssertionError("repeats count must be greater than 0.")
self.repeats = repeats
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`, assuming `img` is a "channel-first" array.
"""
if img.shape[0] < 2:
raise AssertionError("Image must have more than one channel")
return img[:: self.repeats, :]
class SplitChannel(Transform):
"""
Split Numpy array or PyTorch Tensor data according to the channel dim.
It can help applying different following transforms to different channels.
Args:
channel_dim: which dimension of input image is the channel, default to 0.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, channel_dim: int = 0) -> None:
self.channel_dim = channel_dim
def __call__(self, img: NdarrayOrTensor) -> List[NdarrayOrTensor]:
num_classes = img.shape[self.channel_dim]
if num_classes <= 1:
raise RuntimeError("input image does not contain multiple channels.")
outputs = []
slices = [slice(None)] * len(img.shape)
for i in range(num_classes):
slices[self.channel_dim] = slice(i, i + 1)
outputs.append(img[tuple(slices)])
return outputs
class CastToType(Transform):
"""
Cast the Numpy data to specified numpy data type, or cast the PyTorch Tensor to
specified PyTorch data type.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, dtype=np.float32) -> None:
"""
Args:
dtype: convert image to this data type, default is `np.float32`.
"""
self.dtype = dtype
def __call__(self, img: NdarrayOrTensor, dtype: Optional[Union[DtypeLike, torch.dtype]] = None) -> NdarrayOrTensor:
"""
Apply the transform to `img`, assuming `img` is a numpy array or PyTorch Tensor.
Args:
dtype: convert image to this data type, default is `self.dtype`.
Raises:
TypeError: When ``img`` type is not in ``Union[numpy.ndarray, torch.Tensor]``.
"""
if not isinstance(img, (torch.Tensor, np.ndarray)):
raise TypeError(f"img must be one of (numpy.ndarray, torch.Tensor) but is {type(img).__name__}.")
img_out, *_ = convert_data_type(img, output_type=type(img), dtype=dtype or self.dtype)
return img_out
class ToTensor(Transform):
"""
Converts the input image to a tensor without applying any other transformations.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, device: Optional[torch.device] = None) -> None:
super().__init__()
self.device = device
def __call__(self, img: NdarrayOrTensor) -> torch.Tensor:
"""
Apply the transform to `img` and make it contiguous.
"""
return convert_to_tensor(img, wrap_sequence=True, device=self.device) # type: ignore
class EnsureType(Transform):
"""
Ensure the input data to be a PyTorch Tensor or numpy array, support: `numpy array`, `PyTorch Tensor`,
`float`, `int`, `bool`, `string` and `object` keep the original.
If passing a dictionary, list or tuple, still return dictionary, list or tuple and recursively convert
every item to the expected data type.
Args:
data_type: target data type to convert, should be "tensor" or "numpy".
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, data_type: str = "tensor") -> None:
data_type = data_type.lower()
if data_type not in ("tensor", "numpy"):
raise ValueError("`data type` must be 'tensor' or 'numpy'.")
self.data_type = data_type
def __call__(self, data: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Args:
data: input data can be PyTorch Tensor, numpy array, list, dictionary, int, float, bool, str, etc.
will ensure Tensor, Numpy array, float, int, bool as Tensors or numpy arrays, strings and
objects keep the original. for dictionary, list or tuple, ensure every item as expected type
if applicable.
"""
return convert_to_tensor(data) if self.data_type == "tensor" else convert_to_numpy(data) # type: ignore
class ToNumpy(Transform):
"""
Converts the input data to numpy array, can support list or tuple of numbers and PyTorch Tensor.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __call__(self, img: NdarrayOrTensor) -> np.ndarray:
"""
Apply the transform to `img` and make it contiguous.
"""
return convert_to_numpy(img) # type: ignore
class ToCupy(Transform):
"""
Converts the input data to CuPy array, can support list or tuple of numbers, NumPy and PyTorch Tensor.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img` and make it contiguous.
"""
return cp.ascontiguousarray(cp.asarray(img)) # type: ignore
class ToPIL(Transform):
"""
Converts the input image (in the form of NumPy array or PyTorch Tensor) to PIL image
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __call__(self, img):
"""
Apply the transform to `img`.
"""
if isinstance(img, PILImageImage):
return img
if isinstance(img, torch.Tensor):
img = img.detach().cpu().numpy()
return pil_image_fromarray(img)
class Transpose(Transform):
"""
Transposes the input image based on the given `indices` dimension ordering.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, indices: Optional[Sequence[int]]) -> None:
self.indices = None if indices is None else tuple(indices)
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Apply the transform to `img`.
"""
if isinstance(img, torch.Tensor):
return img.permute(self.indices or tuple(range(img.ndim)[::-1]))
return img.transpose(self.indices) # type: ignore
class SqueezeDim(Transform):
"""
Squeeze a unitary dimension.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, dim: Optional[int] = 0) -> None:
"""
Args:
dim: dimension to be squeezed. Default = 0
"None" works when the input is numpy array.
Raises:
TypeError: When ``dim`` is not an ``Optional[int]``.
"""
if dim is not None and not isinstance(dim, int):
raise TypeError(f"dim must be None or a int but is {type(dim).__name__}.")
self.dim = dim
def __call__(self, img: NdarrayOrTensor) -> NdarrayOrTensor:
"""
Args:
img: numpy arrays with required dimension `dim` removed
"""
if self.dim is None:
return img.squeeze()
# for pytorch/numpy unification
if img.shape[self.dim] != 1:
raise ValueError("Can only squeeze singleton dimension")
return img.squeeze(self.dim)
class DataStats(Transform):
"""
Utility transform to show the statistics of data for debug or analysis.
It can be inserted into any place of a transform chain and check results of previous transforms.
It support both `numpy.ndarray` and `torch.tensor` as input data,
so it can be used in pre-processing and post-processing.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(
self,
prefix: str = "Data",
data_type: bool = True,
data_shape: bool = True,
value_range: bool = True,
data_value: bool = False,
additional_info: Optional[Callable] = None,
logger_handler: Optional[logging.Handler] = None,
) -> None:
"""
Args:
prefix: will be printed in format: "{prefix} statistics".
data_type: whether to show the type of input data.
data_shape: whether to show the shape of input data.
value_range: whether to show the value range of input data.
data_value: whether to show the raw value of input data.
a typical example is to print some properties of Nifti image: affine, pixdim, etc.
additional_info: user can define callable function to extract additional info from input data.
logger_handler: add additional handler to output data: save to file, etc.
add existing python logging handlers: https://docs.python.org/3/library/logging.handlers.html
the handler should have a logging level of at least `INFO`.
Raises:
TypeError: When ``additional_info`` is not an ``Optional[Callable]``.
"""
if not isinstance(prefix, str):
raise AssertionError("prefix must be a string.")
self.prefix = prefix
self.data_type = data_type
self.data_shape = data_shape
self.value_range = value_range
self.data_value = data_value
if additional_info is not None and not callable(additional_info):
raise TypeError(f"additional_info must be None or callable but is {type(additional_info).__name__}.")
self.additional_info = additional_info
self._logger_name = "DataStats"
_logger = logging.getLogger(self._logger_name)
_logger.setLevel(logging.INFO)
console = logging.StreamHandler(sys.stdout) # always stdout
console.setLevel(logging.INFO)
_logger.addHandler(console)
if logger_handler is not None:
_logger.addHandler(logger_handler)
def __call__(
self,
img: NdarrayOrTensor,
prefix: Optional[str] = None,
data_type: Optional[bool] = None,
data_shape: Optional[bool] = None,
value_range: Optional[bool] = None,
data_value: Optional[bool] = None,
additional_info: Optional[Callable] = None,
) -> NdarrayOrTensor:
"""
Apply the transform to `img`, optionally take arguments similar to the class constructor.
"""
lines = [f"{prefix or self.prefix} statistics:"]
if self.data_type if data_type is None else data_type:
lines.append(f"Type: {type(img)}")
if self.data_shape if data_shape is None else data_shape:
lines.append(f"Shape: {img.shape}")
if self.value_range if value_range is None else value_range:
if isinstance(img, np.ndarray):
lines.append(f"Value range: ({np.min(img)}, {np.max(img)})")
elif isinstance(img, torch.Tensor):
lines.append(f"Value range: ({torch.min(img)}, {torch.max(img)})")
else:
lines.append(f"Value range: (not a PyTorch or Numpy array, type: {type(img)})")
if self.data_value if data_value is None else data_value:
lines.append(f"Value: {img}")
additional_info = self.additional_info if additional_info is None else additional_info
if additional_info is not None:
lines.append(f"Additional info: {additional_info(img)}")
separator = "\n"
output = f"{separator.join(lines)}"
logging.getLogger(self._logger_name).info(output)
return img
class SimulateDelay(Transform):
"""
This is a pass through transform to be used for testing purposes. It allows
adding fake behaviors that are useful for testing purposes to simulate
how large datasets behave without needing to test on large data sets.
For example, simulating slow NFS data transfers, or slow network transfers
in testing by adding explicit timing delays. Testing of small test data
can lead to incomplete understanding of real world issues, and may lead
to sub-optimal design choices.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, delay_time: float = 0.0) -> None:
"""
Args:
delay_time: The minimum amount of time, in fractions of seconds,
to accomplish this delay task.
"""
super().__init__()
self.delay_time: float = delay_time
def __call__(self, img: NdarrayOrTensor, delay_time: Optional[float] = None) -> NdarrayOrTensor:
"""
Args:
img: data remain unchanged throughout this transform.
delay_time: The minimum amount of time, in fractions of seconds,
to accomplish this delay task.
"""
time.sleep(self.delay_time if delay_time is None else delay_time)
return img
class Lambda(Transform):
"""
Apply a user-defined lambda as a transform.
For example:
.. code-block:: python
:emphasize-lines: 2
image = np.ones((10, 2, 2))
lambd = Lambda(func=lambda x: x[:4, :, :])
print(lambd(image).shape)
(4, 2, 2)
Args:
func: Lambda/function to be applied.
Raises:
TypeError: When ``func`` is not an ``Optional[Callable]``.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__(self, func: Optional[Callable] = None) -> None:
if func is not None and not callable(func):
raise TypeError(f"func must be None or callable but is {type(func).__name__}.")
self.func = func
def __call__(self, img: NdarrayOrTensor, func: Optional[Callable] = None):
"""
Apply `self.func` to `img`.
Args:
func: Lambda/function to be applied. Defaults to `self.func`.
Raises:
TypeError: When ``func`` is not an ``Optional[Callable]``.
ValueError: When ``func=None`` and ``self.func=None``. Incompatible values.
"""
if func is not None:
if not callable(func):
raise TypeError(f"func must be None or callable but is {type(func).__name__}.")
return func(img)
if self.func is not None:
return self.func(img)
raise ValueError("Incompatible values: func=None and self.func=None.")
class RandLambda(Lambda, RandomizableTransform):
"""
Randomizable version :py:class:`monai.transforms.Lambda`, the input `func` may contain random logic,
or randomly execute the function based on `prob`.
Args:
func: Lambda/function to be applied.
prob: probability of executing the random function, default to 1.0, with 100% probability to execute.
For more details, please check :py:class:`monai.transforms.Lambda`.
"""
backend = Lambda.backend
def __init__(self, func: Optional[Callable] = None, prob: float = 1.0) -> None:
Lambda.__init__(self=self, func=func)
RandomizableTransform.__init__(self=self, prob=prob)
def __call__(self, img: NdarrayOrTensor, func: Optional[Callable] = None):
self.randomize(img)
return super().__call__(img=img, func=func) if self._do_transform else img
class LabelToMask(Transform):
"""
Convert labels to mask for other tasks. A typical usage is to convert segmentation labels
to mask data to pre-process images and then feed the images into classification network.
It can support single channel labels or One-Hot labels with specified `select_labels`.
For example, users can select `label value = [2, 3]` to construct mask data, or select the
second and the third channels of labels to construct mask data.
The output mask data can be a multiple channels binary data or a single channel binary
data that merges all the channels.
Args:
select_labels: labels to generate mask from. for 1 channel label, the `select_labels`
is the expected label values, like: [1, 2, 3]. for One-Hot format label, the
`select_labels` is the expected channel indices.
merge_channels: whether to use `np.any()` to merge the result on channel dim. if yes,
will return a single channel mask with binary data.
"""
backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
def __init__( # pytype: disable=annotation-type-mismatch
self,
select_labels: Union[Sequence[int], int],
merge_channels: bool = False,
) -> None: # pytype: disable=annotation-type-mismatch
self.select_labels = ensure_tuple(select_labels)
self.merge_channels = merge_channels
def __call__(
self,
img: NdarrayOrTensor,
select_labels: Optional[Union[Sequence[int], int]] = None,
merge_channels: bool = False,
) -> NdarrayOrTensor:
"""
Args:
select_labels: labels to generate mask from. for 1 channel label, the `select_labels`
is the expected label values, like: [1, 2, 3]. for One-Hot format label, the
`select_labels` is the expected channel indices.
merge_channels: whether to use `np.any()` to merge the result on channel dim. if yes,
will return a single channel mask with binary data.
"""
if select_labels is None:
select_labels = self.select_labels
else:
select_labels = ensure_tuple(select_labels)
if img.shape[0] > 1:
data = img[[*select_labels]]
else:
where = np.where if isinstance(img, np.ndarray) else torch.where
if isinstance(img, np.ndarray) or is_module_ver_at_least(torch, (1, 8, 0)):
data = where(in1d(img, select_labels), True, False).reshape(img.shape)
# pre pytorch 1.8.0, need to use 1/0 instead of True/False
else:
data = where(
in1d(img, select_labels), torch.tensor(1, device=img.device), torch.tensor(0, device=img.device)
).reshape(img.shape)
if merge_channels or self.merge_channels:
if isinstance(img, np.ndarray) or is_module_ver_at_least(torch, (1, 8, 0)):
return data.any(0)[None]
# pre pytorch 1.8.0 compatibility
return data.to(torch.uint8).any(0)[None].to(bool) # type: ignore
return data
class FgBgToIndices(Transform):
"""
Compute foreground and background of the input label data, return the indices.
If no output_shape specified, output data will be 1 dim indices after flattening.
This transform can help pre-compute foreground and background regions for other transforms.
A typical usage is to randomly select foreground and background to crop.
The main logic is based on :py:class:`monai.transforms.utils.map_binary_to_indices`.
Args:
image_threshold: if enabled `image` at runtime, use ``image > image_threshold`` to
determine the valid image content area and select background only in this area.
output_shape: expected shape of output indices. if not None, unravel indices to specified shape.
"""
def __init__(self, image_threshold: float = 0.0, output_shape: Optional[Sequence[int]] = None) -> None:
self.image_threshold = image_threshold
self.output_shape = output_shape
def __call__(
self,
label: np.ndarray,
image: Optional[np.ndarray] = None,
output_shape: Optional[Sequence[int]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Args:
label: input data to compute foreground and background indices.
image: if image is not None, use ``label = 0 & image > image_threshold``
to define background. so the output items will not map to all the voxels in the label.
output_shape: expected shape of output indices. if None, use `self.output_shape` instead.
"""
if output_shape is None:
output_shape = self.output_shape
fg_indices, bg_indices = map_binary_to_indices(label, image, self.image_threshold)
if output_shape is not None:
fg_indices = np.stack([np.unravel_index(i, output_shape) for i in fg_indices])
bg_indices = np.stack([np.unravel_index(i, output_shape) for i in bg_indices])
return fg_indices, bg_indices
class ClassesToIndices(Transform):
def __init__(
self,
num_classes: Optional[int] = None,
image_threshold: float = 0.0,
output_shape: Optional[Sequence[int]] = None,
) -> None:
"""
Compute indices of every class of the input label data, return a list of indices.
If no output_shape specified, output data will be 1 dim indices after flattening.
This transform can help pre-compute indices of the class regions for other transforms.
A typical usage is to randomly select indices of classes to crop.
The main logic is based on :py:class:`monai.transforms.utils.map_classes_to_indices`.
Args:
num_classes: number of classes for argmax label, not necessary for One-Hot label.
image_threshold: if enabled `image` at runtime, use ``image > image_threshold`` to
determine the valid image content area and select only the indices of classes in this area.
output_shape: expected shape of output indices. if not None, unravel indices to specified shape.
"""
self.num_classes = num_classes
self.image_threshold = image_threshold
self.output_shape = output_shape
def __call__(
self,
label: np.ndarray,
image: Optional[np.ndarray] = None,
output_shape: Optional[Sequence[int]] = None,
) -> List[np.ndarray]:
"""
Args:
label: input data to compute the indices of every class.
image: if image is not None, use ``image > image_threshold`` to define valid region, and only select
the indices within the valid region.
output_shape: expected shape of output indices. if None, use `self.output_shape` instead.
"""
if output_shape is None:
output_shape = self.output_shape
indices = map_classes_to_indices(label, self.num_classes, image, self.image_threshold)
if output_shape is not None:
indices = [np.stack([np.unravel_index(i, output_shape) for i in array]) for array in indices]
return indices
class ConvertToMultiChannelBasedOnBratsClasses(Transform):
"""
Convert labels to multi channels based on brats18 classes:
label 1 is the necrotic and non-enhancing tumor core
label 2 is the the peritumoral edema
label 4 is the GD-enhancing tumor
The possible classes are TC (Tumor core), WT (Whole tumor)
and ET (Enhancing tumor).
"""
def __call__(self, img: np.ndarray) -> np.ndarray:
# if img has channel dim, squeeze it
if img.ndim == 4 and img.shape[0] == 1:
img = np.squeeze(img, axis=0)
result = []
# merge labels 1 (tumor non-enh) and 4 (tumor enh) to TC
result.append(np.logical_or(img == 1, img == 4))
# merge labels 1 (tumor non-enh) and 4 (tumor enh) and 2 (large edema) to WT
result.append(np.logical_or(np.logical_or(img == 1, img == 4), img == 2))
# label 4 is ET
result.append(img == 4)
return np.stack(result, axis=0)
class AddExtremePointsChannel(Randomizable, Transform):
"""
Add extreme points of label to the image as a new channel. This transform generates extreme
point from label and applies a gaussian filter. The pixel values in points image are rescaled
to range [rescale_min, rescale_max] and added as a new channel to input image. The algorithm is
described in Roth et al., Going to Extremes: Weakly Supervised Medical Image Segmentation
https://arxiv.org/abs/2009.11988.
This transform only supports single channel labels (1, spatial_dim1, [spatial_dim2, ...]). The
background ``index`` is ignored when calculating extreme points.
Args:
background: Class index of background label, defaults to 0.
pert: Random perturbation amount to add to the points, defaults to 0.0.
Raises:
ValueError: When no label image provided.
ValueError: When label image is not single channel.
"""
def __init__(self, background: int = 0, pert: float = 0.0) -> None:
self._background = background
self._pert = pert
self._points: List[Tuple[int, ...]] = []
def randomize(self, label: np.ndarray) -> None:
self._points = get_extreme_points(label, rand_state=self.R, background=self._background, pert=self._pert)
def __call__(
self,
img: np.ndarray,
label: Optional[np.ndarray] = None,
sigma: Union[Sequence[float], float, Sequence[torch.Tensor], torch.Tensor] = 3.0,
rescale_min: float = -1.0,
rescale_max: float = 1.0,
):
"""
Args:
img: the image that we want to add new channel to.
label: label image to get extreme points from. Shape must be
(1, spatial_dim1, [, spatial_dim2, ...]). Doesn't support one-hot labels.
sigma: if a list of values, must match the count of spatial dimensions of input data,
and apply every value in the list to 1 spatial dimension. if only 1 value provided,
use it for all spatial dimensions.
rescale_min: minimum value of output data.
rescale_max: maximum value of output data.
"""
if label is None:
raise ValueError("This transform requires a label array!")
if label.shape[0] != 1:
raise ValueError("Only supports single channel labels!")
# Generate extreme points
self.randomize(label[0, :])
points_image = extreme_points_to_image(
points=self._points, label=label, sigma=sigma, rescale_min=rescale_min, rescale_max=rescale_max
)
return np.concatenate([img, points_image], axis=0)
class TorchVision:
"""
This is a wrapper transform for PyTorch TorchVision transform based on the specified transform name and args.
As most of the TorchVision transforms only work for PIL image and PyTorch Tensor, this transform expects input
data to be PyTorch Tensor, users can easily call `ToTensor` transform to convert a Numpy array to Tensor.
"""
def __init__(self, name: str, *args, **kwargs) -> None:
"""
Args:
name: The transform name in TorchVision package.
args: parameters for the TorchVision transform.
kwargs: parameters for the TorchVision transform.
"""
super().__init__()
transform, _ = optional_import("torchvision.transforms", "0.8.0", min_version, name=name)
self.trans = transform(*args, **kwargs)
def __call__(self, img: torch.Tensor):
"""
Args:
img: PyTorch Tensor data for the TorchVision transform.
"""
return self.trans(img)
class MapLabelValue:
"""
Utility to map label values to another set of values.
For example, map [3, 2, 1] to [0, 1, 2], [1, 2, 3] -> [0.5, 1.5, 2.5], ["label3", "label2", "label1"] -> [0, 1, 2],
[3.5, 2.5, 1.5] -> ["label0", "label1", "label2"], etc.
The label data must be numpy array or array-like data and the output data will be numpy array.
"""
def __init__(self, orig_labels: Sequence, target_labels: Sequence, dtype: DtypeLike = np.float32) -> None:
"""
Args:
orig_labels: original labels that map to others.
target_labels: expected label values, 1: 1 map to the `orig_labels`.
dtype: convert the output data to dtype, default to float32.
"""
if len(orig_labels) != len(target_labels):
raise ValueError("orig_labels and target_labels must have the same length.")
if all(o == z for o, z in zip(orig_labels, target_labels)):
raise ValueError("orig_labels and target_labels are exactly the same, should be different to map.")
self.orig_labels = orig_labels
self.target_labels = target_labels
self.dtype = dtype
def __call__(self, img: np.ndarray):
img = np.asarray(img)
img_flat = img.flatten()
try:
out_flat = np.copy(img_flat).astype(self.dtype)
except ValueError:
# can't copy unchanged labels as the expected dtype is not supported, must map all the label values
out_flat = np.zeros(shape=img_flat.shape, dtype=self.dtype)
for o, t in zip(self.orig_labels, self.target_labels):
if o == t:
continue
np.place(out_flat, img_flat == o, t)