Coverage for src/bioimageio/spec/model/v0_5.py: 75%
1364 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 13:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 13:10 +0000
1from __future__ import annotations
3import collections.abc
4import re
5import string
6import warnings
7from abc import ABC
8from copy import deepcopy
9from itertools import chain
10from math import ceil
11from pathlib import Path, PurePosixPath
12from tempfile import mkdtemp
13from typing import (
14 TYPE_CHECKING,
15 Any,
16 Callable,
17 ClassVar,
18 Dict,
19 Generic,
20 List,
21 Literal,
22 Mapping,
23 NamedTuple,
24 Optional,
25 Sequence,
26 Set,
27 Tuple,
28 Type,
29 TypeVar,
30 Union,
31 cast,
32)
34import numpy as np
35from annotated_types import Ge, Gt, Interval, MaxLen, MinLen, Predicate
36from imageio.v3 import imread, imwrite # pyright: ignore[reportUnknownVariableType]
37from loguru import logger
38from numpy.typing import NDArray
39from pydantic import (
40 AfterValidator,
41 Discriminator,
42 Field,
43 RootModel,
44 SerializationInfo,
45 SerializerFunctionWrapHandler,
46 StrictInt,
47 Tag,
48 ValidationInfo,
49 WrapSerializer,
50 field_validator,
51 model_serializer,
52 model_validator,
53)
54from typing_extensions import Annotated, Self, assert_never, get_args
56from .._internal.common_nodes import (
57 InvalidDescr,
58 Node,
59 NodeWithExplicitlySetFields,
60)
61from .._internal.constants import DTYPE_LIMITS
62from .._internal.field_warning import issue_warning, warn
63from .._internal.io import BioimageioYamlContent as BioimageioYamlContent
64from .._internal.io import FileDescr as FileDescr
65from .._internal.io import (
66 FileSource,
67 WithSuffix,
68 YamlValue,
69 extract_file_name,
70 get_reader,
71 wo_special_file_name,
72)
73from .._internal.io_basics import Sha256 as Sha256
74from .._internal.io_packaging import (
75 FileDescr_,
76 FileSource_,
77 package_file_descr_serializer,
78)
79from .._internal.io_utils import load_array
80from .._internal.node_converter import Converter
81from .._internal.type_guards import is_dict, is_sequence
82from .._internal.types import (
83 FAIR,
84 AbsoluteTolerance,
85 LowerCaseIdentifier,
86 LowerCaseIdentifierAnno,
87 MismatchedElementsPerMillion,
88 RelativeTolerance,
89)
90from .._internal.types import Datetime as Datetime
91from .._internal.types import Identifier as Identifier
92from .._internal.types import NotEmpty as NotEmpty
93from .._internal.types import SiUnit as SiUnit
94from .._internal.url import HttpUrl as HttpUrl
95from .._internal.validation_context import get_validation_context
96from .._internal.validator_annotations import RestrictCharacters
97from .._internal.version_type import Version as Version
98from .._internal.warning_levels import INFO
99from ..dataset.v0_2 import DatasetDescr as DatasetDescr02
100from ..dataset.v0_2 import LinkedDataset as LinkedDataset02
101from ..dataset.v0_3 import DatasetDescr as DatasetDescr
102from ..dataset.v0_3 import DatasetId as DatasetId
103from ..dataset.v0_3 import LinkedDataset as LinkedDataset
104from ..dataset.v0_3 import Uploader as Uploader
105from ..generic.v0_3 import (
106 VALID_COVER_IMAGE_EXTENSIONS as VALID_COVER_IMAGE_EXTENSIONS,
107)
108from ..generic.v0_3 import Author as Author
109from ..generic.v0_3 import BadgeDescr as BadgeDescr
110from ..generic.v0_3 import CiteEntry as CiteEntry
111from ..generic.v0_3 import DeprecatedLicenseId as DeprecatedLicenseId
112from ..generic.v0_3 import Doi as Doi
113from ..generic.v0_3 import (
114 FileSource_documentation,
115 GenericModelDescrBase,
116 LinkedResourceBase,
117 _author_conv, # pyright: ignore[reportPrivateUsage]
118 _maintainer_conv, # pyright: ignore[reportPrivateUsage]
119)
120from ..generic.v0_3 import LicenseId as LicenseId
121from ..generic.v0_3 import LinkedResource as LinkedResource
122from ..generic.v0_3 import Maintainer as Maintainer
123from ..generic.v0_3 import OrcidId as OrcidId
124from ..generic.v0_3 import RelativeFilePath as RelativeFilePath
125from ..generic.v0_3 import ResourceId as ResourceId
126from .v0_4 import Author as _Author_v0_4
127from .v0_4 import BinarizeDescr as _BinarizeDescr_v0_4
128from .v0_4 import CallableFromDepencency as CallableFromDepencency
129from .v0_4 import CallableFromDepencency as _CallableFromDepencency_v0_4
130from .v0_4 import CallableFromFile as _CallableFromFile_v0_4
131from .v0_4 import ClipDescr as _ClipDescr_v0_4
132from .v0_4 import ClipKwargs as ClipKwargs
133from .v0_4 import ImplicitOutputShape as _ImplicitOutputShape_v0_4
134from .v0_4 import InputTensorDescr as _InputTensorDescr_v0_4
135from .v0_4 import KnownRunMode as KnownRunMode
136from .v0_4 import ModelDescr as _ModelDescr_v0_4
137from .v0_4 import OutputTensorDescr as _OutputTensorDescr_v0_4
138from .v0_4 import ParameterizedInputShape as _ParameterizedInputShape_v0_4
139from .v0_4 import PostprocessingDescr as _PostprocessingDescr_v0_4
140from .v0_4 import PreprocessingDescr as _PreprocessingDescr_v0_4
141from .v0_4 import ProcessingKwargs as ProcessingKwargs
142from .v0_4 import RunMode as RunMode
143from .v0_4 import ScaleLinearDescr as _ScaleLinearDescr_v0_4
144from .v0_4 import ScaleMeanVarianceDescr as _ScaleMeanVarianceDescr_v0_4
145from .v0_4 import ScaleRangeDescr as _ScaleRangeDescr_v0_4
146from .v0_4 import SigmoidDescr as _SigmoidDescr_v0_4
147from .v0_4 import TensorName as _TensorName_v0_4
148from .v0_4 import WeightsFormat as WeightsFormat
149from .v0_4 import ZeroMeanUnitVarianceDescr as _ZeroMeanUnitVarianceDescr_v0_4
150from .v0_4 import package_weights
152SpaceUnit = Literal[
153 "attometer",
154 "angstrom",
155 "centimeter",
156 "decimeter",
157 "exameter",
158 "femtometer",
159 "foot",
160 "gigameter",
161 "hectometer",
162 "inch",
163 "kilometer",
164 "megameter",
165 "meter",
166 "micrometer",
167 "mile",
168 "millimeter",
169 "nanometer",
170 "parsec",
171 "petameter",
172 "picometer",
173 "terameter",
174 "yard",
175 "yoctometer",
176 "yottameter",
177 "zeptometer",
178 "zettameter",
179]
180"""Space unit compatible to the [OME-Zarr axes specification 0.5](https://ngff.openmicroscopy.org/0.5/#axes-md)"""
182TimeUnit = Literal[
183 "attosecond",
184 "centisecond",
185 "day",
186 "decisecond",
187 "exasecond",
188 "femtosecond",
189 "gigasecond",
190 "hectosecond",
191 "hour",
192 "kilosecond",
193 "megasecond",
194 "microsecond",
195 "millisecond",
196 "minute",
197 "nanosecond",
198 "petasecond",
199 "picosecond",
200 "second",
201 "terasecond",
202 "yoctosecond",
203 "yottasecond",
204 "zeptosecond",
205 "zettasecond",
206]
207"""Time unit compatible to the [OME-Zarr axes specification 0.5](https://ngff.openmicroscopy.org/0.5/#axes-md)"""
209AxisType = Literal["batch", "channel", "index", "time", "space"]
211_AXIS_TYPE_MAP: Mapping[str, AxisType] = {
212 "b": "batch",
213 "t": "time",
214 "i": "index",
215 "c": "channel",
216 "x": "space",
217 "y": "space",
218 "z": "space",
219}
221_AXIS_ID_MAP = {
222 "b": "batch",
223 "t": "time",
224 "i": "index",
225 "c": "channel",
226}
229class TensorId(LowerCaseIdentifier):
230 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[
231 Annotated[LowerCaseIdentifierAnno, MaxLen(32)]
232 ]
235def _normalize_axis_id(a: str):
236 a = str(a)
237 normalized = _AXIS_ID_MAP.get(a, a)
238 if a != normalized:
239 logger.opt(depth=3).warning(
240 "Normalized axis id from '{}' to '{}'.", a, normalized
241 )
242 return normalized
245class AxisId(LowerCaseIdentifier):
246 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[
247 Annotated[
248 LowerCaseIdentifierAnno,
249 MaxLen(16),
250 AfterValidator(_normalize_axis_id),
251 ]
252 ]
255def _is_batch(a: str) -> bool:
256 return str(a) == "batch"
259def _is_not_batch(a: str) -> bool:
260 return not _is_batch(a)
263NonBatchAxisId = Annotated[AxisId, Predicate(_is_not_batch)]
265PreprocessingId = Literal[
266 "binarize",
267 "clip",
268 "ensure_dtype",
269 "fixed_zero_mean_unit_variance",
270 "scale_linear",
271 "scale_range",
272 "sigmoid",
273 "softmax",
274]
275PostprocessingId = Literal[
276 "binarize",
277 "clip",
278 "ensure_dtype",
279 "fixed_zero_mean_unit_variance",
280 "scale_linear",
281 "scale_mean_variance",
282 "scale_range",
283 "sigmoid",
284 "softmax",
285 "zero_mean_unit_variance",
286]
289SAME_AS_TYPE = "<same as type>"
292ParameterizedSize_N = int
293"""
294Annotates an integer to calculate a concrete axis size from a `ParameterizedSize`.
295"""
298class ParameterizedSize(Node):
299 """Describes a range of valid tensor axis sizes as `size = min + n*step`.
301 - **min** and **step** are given by the model description.
302 - All blocksize paramters n = 0,1,2,... yield a valid `size`.
303 - A greater blocksize paramter n = 0,1,2,... results in a greater **size**.
304 This allows to adjust the axis size more generically.
305 """
307 N: ClassVar[Type[int]] = ParameterizedSize_N
308 """Positive integer to parameterize this axis"""
310 min: Annotated[int, Gt(0)]
311 step: Annotated[int, Gt(0)]
313 def validate_size(self, size: int) -> int:
314 if size < self.min:
315 raise ValueError(f"size {size} < {self.min}")
316 if (size - self.min) % self.step != 0:
317 raise ValueError(
318 f"axis of size {size} is not parameterized by `min + n*step` ="
319 + f" `{self.min} + n*{self.step}`"
320 )
322 return size
324 def get_size(self, n: ParameterizedSize_N) -> int:
325 return self.min + self.step * n
327 def get_n(self, s: int) -> ParameterizedSize_N:
328 """return smallest n parameterizing a size greater or equal than `s`"""
329 return ceil((s - self.min) / self.step)
332class DataDependentSize(Node):
333 min: Annotated[int, Gt(0)] = 1
334 max: Annotated[Optional[int], Gt(1)] = None
336 @model_validator(mode="after")
337 def _validate_max_gt_min(self):
338 if self.max is not None and self.min >= self.max:
339 raise ValueError(f"expected `min` < `max`, but got {self.min}, {self.max}")
341 return self
343 def validate_size(self, size: int) -> int:
344 if size < self.min:
345 raise ValueError(f"size {size} < {self.min}")
347 if self.max is not None and size > self.max:
348 raise ValueError(f"size {size} > {self.max}")
350 return size
353class SizeReference(Node):
354 """A tensor axis size (extent in pixels/frames) defined in relation to a reference axis.
356 `axis.size = reference.size * reference.scale / axis.scale + offset`
358 Note:
359 1. The axis and the referenced axis need to have the same unit (or no unit).
360 2. Batch axes may not be referenced.
361 3. Fractions are rounded down.
362 4. If the reference axis is `concatenable` the referencing axis is assumed to be
363 `concatenable` as well with the same block order.
365 Example:
366 An unisotropic input image of w*h=100*49 pixels depicts a phsical space of 200*196mm².
367 Let's assume that we want to express the image height h in relation to its width w
368 instead of only accepting input images of exactly 100*49 pixels
369 (for example to express a range of valid image shapes by parametrizing w, see `ParameterizedSize`).
371 >>> w = SpaceInputAxis(id=AxisId("w"), size=100, unit="millimeter", scale=2)
372 >>> h = SpaceInputAxis(
373 ... id=AxisId("h"),
374 ... size=SizeReference(tensor_id=TensorId("input"), axis_id=AxisId("w"), offset=-1),
375 ... unit="millimeter",
376 ... scale=4,
377 ... )
378 >>> print(h.size.get_size(h, w))
379 49
381 ⇒ h = w * w.scale / h.scale + offset = 100 * 2mm / 4mm - 1 = 49
382 """
384 tensor_id: TensorId
385 """tensor id of the reference axis"""
387 axis_id: AxisId
388 """axis id of the reference axis"""
390 offset: StrictInt = 0
392 def get_size(
393 self,
394 axis: Union[
395 ChannelAxis,
396 IndexInputAxis,
397 IndexOutputAxis,
398 TimeInputAxis,
399 SpaceInputAxis,
400 TimeOutputAxis,
401 TimeOutputAxisWithHalo,
402 SpaceOutputAxis,
403 SpaceOutputAxisWithHalo,
404 ],
405 ref_axis: Union[
406 ChannelAxis,
407 IndexInputAxis,
408 IndexOutputAxis,
409 TimeInputAxis,
410 SpaceInputAxis,
411 TimeOutputAxis,
412 TimeOutputAxisWithHalo,
413 SpaceOutputAxis,
414 SpaceOutputAxisWithHalo,
415 ],
416 n: ParameterizedSize_N = 0,
417 ref_size: Optional[int] = None,
418 ):
419 """Compute the concrete size for a given axis and its reference axis.
421 Args:
422 axis: The axis this `SizeReference` is the size of.
423 ref_axis: The reference axis to compute the size from.
424 n: If the **ref_axis** is parameterized (of type `ParameterizedSize`)
425 and no fixed **ref_size** is given,
426 **n** is used to compute the size of the parameterized **ref_axis**.
427 ref_size: Overwrite the reference size instead of deriving it from
428 **ref_axis**
429 (**ref_axis.scale** is still used; any given **n** is ignored).
430 """
431 assert axis.size == self, (
432 "Given `axis.size` is not defined by this `SizeReference`"
433 )
435 assert ref_axis.id == self.axis_id, (
436 f"Expected `ref_axis.id` to be {self.axis_id}, but got {ref_axis.id}."
437 )
439 assert axis.unit == ref_axis.unit, (
440 "`SizeReference` requires `axis` and `ref_axis` to have the same `unit`,"
441 f" but {axis.unit}!={ref_axis.unit}"
442 )
443 if ref_size is None:
444 if isinstance(ref_axis.size, (int, float)):
445 ref_size = ref_axis.size
446 elif isinstance(ref_axis.size, ParameterizedSize):
447 ref_size = ref_axis.size.get_size(n)
448 elif isinstance(ref_axis.size, DataDependentSize):
449 raise ValueError(
450 "Reference axis referenced in `SizeReference` may not be a `DataDependentSize`."
451 )
452 elif isinstance(ref_axis.size, SizeReference):
453 raise ValueError(
454 "Reference axis referenced in `SizeReference` may not be sized by a"
455 + " `SizeReference` itself."
456 )
457 else:
458 assert_never(ref_axis.size)
460 return int(ref_size * ref_axis.scale / axis.scale + self.offset)
462 @staticmethod
463 def _get_unit(
464 axis: Union[
465 ChannelAxis,
466 IndexInputAxis,
467 IndexOutputAxis,
468 TimeInputAxis,
469 SpaceInputAxis,
470 TimeOutputAxis,
471 TimeOutputAxisWithHalo,
472 SpaceOutputAxis,
473 SpaceOutputAxisWithHalo,
474 ],
475 ):
476 return axis.unit
479class AxisBase(NodeWithExplicitlySetFields):
480 id: AxisId
481 """An axis id unique across all axes of one tensor."""
483 description: Annotated[str, MaxLen(128)] = ""
484 """A short description of this axis beyond its type and id."""
487class WithHalo(Node):
488 halo: Annotated[int, Ge(1)]
489 """The halo should be cropped from the output tensor to avoid boundary effects.
490 It is to be cropped from both sides, i.e. `size_after_crop = size - 2 * halo`.
491 To document a halo that is already cropped by the model use `size.offset` instead."""
493 size: Annotated[
494 SizeReference,
495 Field(
496 examples=[
497 10,
498 SizeReference(
499 tensor_id=TensorId("t"), axis_id=AxisId("a"), offset=5
500 ).model_dump(mode="json"),
501 ]
502 ),
503 ]
504 """reference to another axis with an optional offset (see `SizeReference`)"""
507BATCH_AXIS_ID = AxisId("batch")
510class BatchAxis(AxisBase):
511 implemented_type: ClassVar[Literal["batch"]] = "batch"
512 if TYPE_CHECKING:
513 type: Literal["batch"] = "batch"
514 else:
515 type: Literal["batch"]
517 id: Annotated[AxisId, Predicate(_is_batch)] = BATCH_AXIS_ID
518 size: Optional[Literal[1]] = None
519 """The batch size may be fixed to 1,
520 otherwise (the default) it may be chosen arbitrarily depending on available memory"""
522 @property
523 def scale(self):
524 return 1.0
526 @property
527 def concatenable(self):
528 return True
530 @property
531 def unit(self):
532 return None
535class ChannelAxis(AxisBase):
536 implemented_type: ClassVar[Literal["channel"]] = "channel"
537 if TYPE_CHECKING:
538 type: Literal["channel"] = "channel"
539 else:
540 type: Literal["channel"]
542 id: NonBatchAxisId = AxisId("channel")
544 channel_names: NotEmpty[List[Identifier]]
546 @property
547 def size(self) -> int:
548 return len(self.channel_names)
550 @property
551 def concatenable(self):
552 return False
554 @property
555 def scale(self) -> float:
556 return 1.0
558 @property
559 def unit(self):
560 return None
563class IndexAxisBase(AxisBase):
564 implemented_type: ClassVar[Literal["index"]] = "index"
565 if TYPE_CHECKING:
566 type: Literal["index"] = "index"
567 else:
568 type: Literal["index"]
570 id: NonBatchAxisId = AxisId("index")
572 @property
573 def scale(self) -> float:
574 return 1.0
576 @property
577 def unit(self):
578 return None
581class _WithInputAxisSize(Node):
582 size: Annotated[
583 Union[Annotated[int, Gt(0)], ParameterizedSize, SizeReference],
584 Field(
585 examples=[
586 10,
587 ParameterizedSize(min=32, step=16).model_dump(mode="json"),
588 SizeReference(
589 tensor_id=TensorId("t"), axis_id=AxisId("a"), offset=5
590 ).model_dump(mode="json"),
591 ]
592 ),
593 ]
594 """The size/length of this axis can be specified as
595 - fixed integer
596 - parameterized series of valid sizes (`ParameterizedSize`)
597 - reference to another axis with an optional offset (`SizeReference`)
598 """
601class IndexInputAxis(IndexAxisBase, _WithInputAxisSize):
602 concatenable: bool = False
603 """If a model has a `concatenable` input axis, it can be processed blockwise,
604 splitting a longer sample axis into blocks matching its input tensor description.
605 Output axes are concatenable if they have a `SizeReference` to a concatenable
606 input axis.
607 """
610class IndexOutputAxis(IndexAxisBase):
611 size: Annotated[
612 Union[Annotated[int, Gt(0)], SizeReference, DataDependentSize],
613 Field(
614 examples=[
615 10,
616 SizeReference(
617 tensor_id=TensorId("t"), axis_id=AxisId("a"), offset=5
618 ).model_dump(mode="json"),
619 ]
620 ),
621 ]
622 """The size/length of this axis can be specified as
623 - fixed integer
624 - reference to another axis with an optional offset (`SizeReference`)
625 - data dependent size using `DataDependentSize` (size is only known after model inference)
626 """
629class TimeAxisBase(AxisBase):
630 implemented_type: ClassVar[Literal["time"]] = "time"
631 if TYPE_CHECKING:
632 type: Literal["time"] = "time"
633 else:
634 type: Literal["time"]
636 id: NonBatchAxisId = AxisId("time")
637 unit: Optional[TimeUnit] = None
638 scale: Annotated[float, Gt(0)] = 1.0
641class TimeInputAxis(TimeAxisBase, _WithInputAxisSize):
642 concatenable: bool = False
643 """If a model has a `concatenable` input axis, it can be processed blockwise,
644 splitting a longer sample axis into blocks matching its input tensor description.
645 Output axes are concatenable if they have a `SizeReference` to a concatenable
646 input axis.
647 """
650class SpaceAxisBase(AxisBase):
651 implemented_type: ClassVar[Literal["space"]] = "space"
652 if TYPE_CHECKING:
653 type: Literal["space"] = "space"
654 else:
655 type: Literal["space"]
657 id: Annotated[NonBatchAxisId, Field(examples=["x", "y", "z"])] = AxisId("x")
658 unit: Optional[SpaceUnit] = None
659 scale: Annotated[float, Gt(0)] = 1.0
662class SpaceInputAxis(SpaceAxisBase, _WithInputAxisSize):
663 concatenable: bool = False
664 """If a model has a `concatenable` input axis, it can be processed blockwise,
665 splitting a longer sample axis into blocks matching its input tensor description.
666 Output axes are concatenable if they have a `SizeReference` to a concatenable
667 input axis.
668 """
671INPUT_AXIS_TYPES = (
672 BatchAxis,
673 ChannelAxis,
674 IndexInputAxis,
675 TimeInputAxis,
676 SpaceInputAxis,
677)
678"""intended for isinstance comparisons in py<3.10"""
680_InputAxisUnion = Union[
681 BatchAxis, ChannelAxis, IndexInputAxis, TimeInputAxis, SpaceInputAxis
682]
683InputAxis = Annotated[_InputAxisUnion, Discriminator("type")]
686class _WithOutputAxisSize(Node):
687 size: Annotated[
688 Union[Annotated[int, Gt(0)], SizeReference],
689 Field(
690 examples=[
691 10,
692 SizeReference(
693 tensor_id=TensorId("t"), axis_id=AxisId("a"), offset=5
694 ).model_dump(mode="json"),
695 ]
696 ),
697 ]
698 """The size/length of this axis can be specified as
699 - fixed integer
700 - reference to another axis with an optional offset (see `SizeReference`)
701 """
704class TimeOutputAxis(TimeAxisBase, _WithOutputAxisSize):
705 pass
708class TimeOutputAxisWithHalo(TimeAxisBase, WithHalo):
709 pass
712def _get_halo_axis_discriminator_value(v: Any) -> Literal["with_halo", "wo_halo"]:
713 if isinstance(v, dict):
714 return "with_halo" if "halo" in v else "wo_halo"
715 else:
716 return "with_halo" if hasattr(v, "halo") else "wo_halo"
719_TimeOutputAxisUnion = Annotated[
720 Union[
721 Annotated[TimeOutputAxis, Tag("wo_halo")],
722 Annotated[TimeOutputAxisWithHalo, Tag("with_halo")],
723 ],
724 Discriminator(_get_halo_axis_discriminator_value),
725]
728class SpaceOutputAxis(SpaceAxisBase, _WithOutputAxisSize):
729 pass
732class SpaceOutputAxisWithHalo(SpaceAxisBase, WithHalo):
733 pass
736_SpaceOutputAxisUnion = Annotated[
737 Union[
738 Annotated[SpaceOutputAxis, Tag("wo_halo")],
739 Annotated[SpaceOutputAxisWithHalo, Tag("with_halo")],
740 ],
741 Discriminator(_get_halo_axis_discriminator_value),
742]
745_OutputAxisUnion = Union[
746 BatchAxis, ChannelAxis, IndexOutputAxis, _TimeOutputAxisUnion, _SpaceOutputAxisUnion
747]
748OutputAxis = Annotated[_OutputAxisUnion, Discriminator("type")]
750OUTPUT_AXIS_TYPES = (
751 BatchAxis,
752 ChannelAxis,
753 IndexOutputAxis,
754 TimeOutputAxis,
755 TimeOutputAxisWithHalo,
756 SpaceOutputAxis,
757 SpaceOutputAxisWithHalo,
758)
759"""intended for isinstance comparisons in py<3.10"""
762AnyAxis = Union[InputAxis, OutputAxis]
764ANY_AXIS_TYPES = INPUT_AXIS_TYPES + OUTPUT_AXIS_TYPES
765"""intended for isinstance comparisons in py<3.10"""
767TVs = Union[
768 NotEmpty[List[int]],
769 NotEmpty[List[float]],
770 NotEmpty[List[bool]],
771 NotEmpty[List[str]],
772]
775NominalOrOrdinalDType = Literal[
776 "float32",
777 "float64",
778 "uint8",
779 "int8",
780 "uint16",
781 "int16",
782 "uint32",
783 "int32",
784 "uint64",
785 "int64",
786 "bool",
787]
790class NominalOrOrdinalDataDescr(Node):
791 values: TVs
792 """A fixed set of nominal or an ascending sequence of ordinal values.
793 In this case `data.type` is required to be an unsigend integer type, e.g. 'uint8'.
794 String `values` are interpreted as labels for tensor values 0, ..., N.
795 Note: as YAML 1.2 does not natively support a "set" datatype,
796 nominal values should be given as a sequence (aka list/array) as well.
797 """
799 type: Annotated[
800 NominalOrOrdinalDType,
801 Field(
802 examples=[
803 "float32",
804 "uint8",
805 "uint16",
806 "int64",
807 "bool",
808 ],
809 ),
810 ] = "uint8"
812 @model_validator(mode="after")
813 def _validate_values_match_type(
814 self,
815 ) -> Self:
816 incompatible: List[Any] = []
817 for v in self.values:
818 if self.type == "bool":
819 if not isinstance(v, bool):
820 incompatible.append(v)
821 elif self.type in DTYPE_LIMITS:
822 if (
823 isinstance(v, (int, float))
824 and (
825 v < DTYPE_LIMITS[self.type].min
826 or v > DTYPE_LIMITS[self.type].max
827 )
828 or (isinstance(v, str) and "uint" not in self.type)
829 or (isinstance(v, float) and "int" in self.type)
830 ):
831 incompatible.append(v)
832 else:
833 incompatible.append(v)
835 if len(incompatible) == 5:
836 incompatible.append("...")
837 break
839 if incompatible:
840 raise ValueError(
841 f"data type '{self.type}' incompatible with values {incompatible}"
842 )
844 return self
846 unit: Optional[Union[Literal["arbitrary unit"], SiUnit]] = None
848 @property
849 def range(self):
850 if isinstance(self.values[0], str):
851 return 0, len(self.values) - 1
852 else:
853 return min(self.values), max(self.values)
856IntervalOrRatioDType = Literal[
857 "float32",
858 "float64",
859 "uint8",
860 "int8",
861 "uint16",
862 "int16",
863 "uint32",
864 "int32",
865 "uint64",
866 "int64",
867]
870class IntervalOrRatioDataDescr(Node):
871 type: Annotated[ # TODO: rename to dtype
872 IntervalOrRatioDType,
873 Field(
874 examples=["float32", "float64", "uint8", "uint16"],
875 ),
876 ] = "float32"
877 range: Tuple[Optional[float], Optional[float]] = (
878 None,
879 None,
880 )
881 """Tuple `(minimum, maximum)` specifying the allowed range of the data in this tensor.
882 `None` corresponds to min/max of what can be expressed by **type**."""
883 unit: Union[Literal["arbitrary unit"], SiUnit] = "arbitrary unit"
884 scale: float = 1.0
885 """Scale for data on an interval (or ratio) scale."""
886 offset: Optional[float] = None
887 """Offset for data on a ratio scale."""
889 @model_validator(mode="before")
890 def _replace_inf(cls, data: Any):
891 if is_dict(data):
892 if "range" in data and is_sequence(data["range"]):
893 forbidden = (
894 "inf",
895 "-inf",
896 ".inf",
897 "-.inf",
898 float("inf"),
899 float("-inf"),
900 )
901 if any(v in forbidden for v in data["range"]):
902 issue_warning("replaced 'inf' value", value=data["range"])
904 data["range"] = tuple(
905 (None if v in forbidden else v) for v in data["range"]
906 )
908 return data
911TensorDataDescr = Union[NominalOrOrdinalDataDescr, IntervalOrRatioDataDescr]
914class ProcessingDescrBase(NodeWithExplicitlySetFields, ABC):
915 """processing base class"""
918class BinarizeKwargs(ProcessingKwargs):
919 """key word arguments for `BinarizeDescr`"""
921 threshold: float
922 """The fixed threshold"""
925class BinarizeAlongAxisKwargs(ProcessingKwargs):
926 """key word arguments for `BinarizeDescr`"""
928 threshold: NotEmpty[List[float]]
929 """The fixed threshold values along `axis`"""
931 axis: Annotated[NonBatchAxisId, Field(examples=["channel"])]
932 """The `threshold` axis"""
935class BinarizeDescr(ProcessingDescrBase):
936 """Binarize the tensor with a fixed threshold.
938 Values above `BinarizeKwargs.threshold`/`BinarizeAlongAxisKwargs.threshold`
939 will be set to one, values below the threshold to zero.
941 Examples:
942 - in YAML
943 ```yaml
944 postprocessing:
945 - id: binarize
946 kwargs:
947 axis: 'channel'
948 threshold: [0.25, 0.5, 0.75]
949 ```
950 - in Python:
951 >>> postprocessing = [BinarizeDescr(
952 ... kwargs=BinarizeAlongAxisKwargs(
953 ... axis=AxisId('channel'),
954 ... threshold=[0.25, 0.5, 0.75],
955 ... )
956 ... )]
957 """
959 implemented_id: ClassVar[Literal["binarize"]] = "binarize"
960 if TYPE_CHECKING:
961 id: Literal["binarize"] = "binarize"
962 else:
963 id: Literal["binarize"]
964 kwargs: Union[BinarizeKwargs, BinarizeAlongAxisKwargs]
967class ClipDescr(ProcessingDescrBase):
968 """Set tensor values below min to min and above max to max.
970 See `ScaleRangeDescr` for examples.
971 """
973 implemented_id: ClassVar[Literal["clip"]] = "clip"
974 if TYPE_CHECKING:
975 id: Literal["clip"] = "clip"
976 else:
977 id: Literal["clip"]
979 kwargs: ClipKwargs
982class EnsureDtypeKwargs(ProcessingKwargs):
983 """key word arguments for `EnsureDtypeDescr`"""
985 dtype: Literal[
986 "float32",
987 "float64",
988 "uint8",
989 "int8",
990 "uint16",
991 "int16",
992 "uint32",
993 "int32",
994 "uint64",
995 "int64",
996 "bool",
997 ]
1000class EnsureDtypeDescr(ProcessingDescrBase):
1001 """Cast the tensor data type to `EnsureDtypeKwargs.dtype` (if not matching).
1003 This can for example be used to ensure the inner neural network model gets a
1004 different input tensor data type than the fully described bioimage.io model does.
1006 Examples:
1007 The described bioimage.io model (incl. preprocessing) accepts any
1008 float32-compatible tensor, normalizes it with percentiles and clipping and then
1009 casts it to uint8, which is what the neural network in this example expects.
1010 - in YAML
1011 ```yaml
1012 inputs:
1013 - data:
1014 type: float32 # described bioimage.io model is compatible with any float32 input tensor
1015 preprocessing:
1016 - id: scale_range
1017 kwargs:
1018 axes: ['y', 'x']
1019 max_percentile: 99.8
1020 min_percentile: 5.0
1021 - id: clip
1022 kwargs:
1023 min: 0.0
1024 max: 1.0
1025 - id: ensure_dtype # the neural network of the model requires uint8
1026 kwargs:
1027 dtype: uint8
1028 ```
1029 - in Python:
1030 >>> preprocessing = [
1031 ... ScaleRangeDescr(
1032 ... kwargs=ScaleRangeKwargs(
1033 ... axes= (AxisId('y'), AxisId('x')),
1034 ... max_percentile= 99.8,
1035 ... min_percentile= 5.0,
1036 ... )
1037 ... ),
1038 ... ClipDescr(kwargs=ClipKwargs(min=0.0, max=1.0)),
1039 ... EnsureDtypeDescr(kwargs=EnsureDtypeKwargs(dtype="uint8")),
1040 ... ]
1041 """
1043 implemented_id: ClassVar[Literal["ensure_dtype"]] = "ensure_dtype"
1044 if TYPE_CHECKING:
1045 id: Literal["ensure_dtype"] = "ensure_dtype"
1046 else:
1047 id: Literal["ensure_dtype"]
1049 kwargs: EnsureDtypeKwargs
1052class ScaleLinearKwargs(ProcessingKwargs):
1053 """Key word arguments for `ScaleLinearDescr`"""
1055 gain: float = 1.0
1056 """multiplicative factor"""
1058 offset: float = 0.0
1059 """additive term"""
1061 @model_validator(mode="after")
1062 def _validate(self) -> Self:
1063 if self.gain == 1.0 and self.offset == 0.0:
1064 raise ValueError(
1065 "Redundant linear scaling not allowd. Set `gain` != 1.0 and/or `offset`"
1066 + " != 0.0."
1067 )
1069 return self
1072class ScaleLinearAlongAxisKwargs(ProcessingKwargs):
1073 """Key word arguments for `ScaleLinearDescr`"""
1075 axis: Annotated[NonBatchAxisId, Field(examples=["channel"])]
1076 """The axis of gain and offset values."""
1078 gain: Union[float, NotEmpty[List[float]]] = 1.0
1079 """multiplicative factor"""
1081 offset: Union[float, NotEmpty[List[float]]] = 0.0
1082 """additive term"""
1084 @model_validator(mode="after")
1085 def _validate(self) -> Self:
1086 if isinstance(self.gain, list):
1087 if isinstance(self.offset, list):
1088 if len(self.gain) != len(self.offset):
1089 raise ValueError(
1090 f"Size of `gain` ({len(self.gain)}) and `offset` ({len(self.offset)}) must match."
1091 )
1092 else:
1093 self.offset = [float(self.offset)] * len(self.gain)
1094 elif isinstance(self.offset, list):
1095 self.gain = [float(self.gain)] * len(self.offset)
1096 else:
1097 raise ValueError(
1098 "Do not specify an `axis` for scalar gain and offset values."
1099 )
1101 if all(g == 1.0 for g in self.gain) and all(off == 0.0 for off in self.offset):
1102 raise ValueError(
1103 "Redundant linear scaling not allowd. Set `gain` != 1.0 and/or `offset`"
1104 + " != 0.0."
1105 )
1107 return self
1110class ScaleLinearDescr(ProcessingDescrBase):
1111 """Fixed linear scaling.
1113 Examples:
1114 1. Scale with scalar gain and offset
1115 - in YAML
1116 ```yaml
1117 preprocessing:
1118 - id: scale_linear
1119 kwargs:
1120 gain: 2.0
1121 offset: 3.0
1122 ```
1123 - in Python:
1124 >>> preprocessing = [
1125 ... ScaleLinearDescr(kwargs=ScaleLinearKwargs(gain= 2.0, offset=3.0))
1126 ... ]
1128 2. Independent scaling along an axis
1129 - in YAML
1130 ```yaml
1131 preprocessing:
1132 - id: scale_linear
1133 kwargs:
1134 axis: 'channel'
1135 gain: [1.0, 2.0, 3.0]
1136 ```
1137 - in Python:
1138 >>> preprocessing = [
1139 ... ScaleLinearDescr(
1140 ... kwargs=ScaleLinearAlongAxisKwargs(
1141 ... axis=AxisId("channel"),
1142 ... gain=[1.0, 2.0, 3.0],
1143 ... )
1144 ... )
1145 ... ]
1147 """
1149 implemented_id: ClassVar[Literal["scale_linear"]] = "scale_linear"
1150 if TYPE_CHECKING:
1151 id: Literal["scale_linear"] = "scale_linear"
1152 else:
1153 id: Literal["scale_linear"]
1154 kwargs: Union[ScaleLinearKwargs, ScaleLinearAlongAxisKwargs]
1157class SigmoidDescr(ProcessingDescrBase):
1158 """The logistic sigmoid function, a.k.a. expit function.
1160 Examples:
1161 - in YAML
1162 ```yaml
1163 postprocessing:
1164 - id: sigmoid
1165 ```
1166 - in Python:
1167 >>> postprocessing = [SigmoidDescr()]
1168 """
1170 implemented_id: ClassVar[Literal["sigmoid"]] = "sigmoid"
1171 if TYPE_CHECKING:
1172 id: Literal["sigmoid"] = "sigmoid"
1173 else:
1174 id: Literal["sigmoid"]
1176 @property
1177 def kwargs(self) -> ProcessingKwargs:
1178 """empty kwargs"""
1179 return ProcessingKwargs()
1182class SoftmaxKwargs(ProcessingKwargs):
1183 """key word arguments for `SoftmaxDescr`"""
1185 axis: Annotated[NonBatchAxisId, Field(examples=["channel"])] = AxisId("channel")
1186 """The axis to apply the softmax function along.
1187 Note:
1188 Defaults to 'channel' axis
1189 (which may not exist, in which case
1190 a different axis id has to be specified).
1191 """
1194class SoftmaxDescr(ProcessingDescrBase):
1195 """The softmax function.
1197 Examples:
1198 - in YAML
1199 ```yaml
1200 postprocessing:
1201 - id: softmax
1202 kwargs:
1203 axis: channel
1204 ```
1205 - in Python:
1206 >>> postprocessing = [SoftmaxDescr(kwargs=SoftmaxKwargs(axis=AxisId("channel")))]
1207 """
1209 implemented_id: ClassVar[Literal["softmax"]] = "softmax"
1210 if TYPE_CHECKING:
1211 id: Literal["softmax"] = "softmax"
1212 else:
1213 id: Literal["softmax"]
1215 kwargs: SoftmaxKwargs = Field(default_factory=SoftmaxKwargs.model_construct)
1218class FixedZeroMeanUnitVarianceKwargs(ProcessingKwargs):
1219 """key word arguments for `FixedZeroMeanUnitVarianceDescr`"""
1221 mean: float
1222 """The mean value to normalize with."""
1224 std: Annotated[float, Ge(1e-6)]
1225 """The standard deviation value to normalize with."""
1228class FixedZeroMeanUnitVarianceAlongAxisKwargs(ProcessingKwargs):
1229 """key word arguments for `FixedZeroMeanUnitVarianceDescr`"""
1231 mean: NotEmpty[List[float]]
1232 """The mean value(s) to normalize with."""
1234 std: NotEmpty[List[Annotated[float, Ge(1e-6)]]]
1235 """The standard deviation value(s) to normalize with.
1236 Size must match `mean` values."""
1238 axis: Annotated[NonBatchAxisId, Field(examples=["channel", "index"])]
1239 """The axis of the mean/std values to normalize each entry along that dimension
1240 separately."""
1242 @model_validator(mode="after")
1243 def _mean_and_std_match(self) -> Self:
1244 if len(self.mean) != len(self.std):
1245 raise ValueError(
1246 f"Size of `mean` ({len(self.mean)}) and `std` ({len(self.std)})"
1247 + " must match."
1248 )
1250 return self
1253class FixedZeroMeanUnitVarianceDescr(ProcessingDescrBase):
1254 """Subtract a given mean and divide by the standard deviation.
1256 Normalize with fixed, precomputed values for
1257 `FixedZeroMeanUnitVarianceKwargs.mean` and `FixedZeroMeanUnitVarianceKwargs.std`
1258 Use `FixedZeroMeanUnitVarianceAlongAxisKwargs` for independent scaling along given
1259 axes.
1261 Examples:
1262 1. scalar value for whole tensor
1263 - in YAML
1264 ```yaml
1265 preprocessing:
1266 - id: fixed_zero_mean_unit_variance
1267 kwargs:
1268 mean: 103.5
1269 std: 13.7
1270 ```
1271 - in Python
1272 >>> preprocessing = [FixedZeroMeanUnitVarianceDescr(
1273 ... kwargs=FixedZeroMeanUnitVarianceKwargs(mean=103.5, std=13.7)
1274 ... )]
1276 2. independently along an axis
1277 - in YAML
1278 ```yaml
1279 preprocessing:
1280 - id: fixed_zero_mean_unit_variance
1281 kwargs:
1282 axis: channel
1283 mean: [101.5, 102.5, 103.5]
1284 std: [11.7, 12.7, 13.7]
1285 ```
1286 - in Python
1287 >>> preprocessing = [FixedZeroMeanUnitVarianceDescr(
1288 ... kwargs=FixedZeroMeanUnitVarianceAlongAxisKwargs(
1289 ... axis=AxisId("channel"),
1290 ... mean=[101.5, 102.5, 103.5],
1291 ... std=[11.7, 12.7, 13.7],
1292 ... )
1293 ... )]
1294 """
1296 implemented_id: ClassVar[Literal["fixed_zero_mean_unit_variance"]] = (
1297 "fixed_zero_mean_unit_variance"
1298 )
1299 if TYPE_CHECKING:
1300 id: Literal["fixed_zero_mean_unit_variance"] = "fixed_zero_mean_unit_variance"
1301 else:
1302 id: Literal["fixed_zero_mean_unit_variance"]
1304 kwargs: Union[
1305 FixedZeroMeanUnitVarianceKwargs, FixedZeroMeanUnitVarianceAlongAxisKwargs
1306 ]
1309class ZeroMeanUnitVarianceKwargs(ProcessingKwargs):
1310 """key word arguments for `ZeroMeanUnitVarianceDescr`"""
1312 axes: Annotated[
1313 Optional[Sequence[AxisId]], Field(examples=[("batch", "x", "y")])
1314 ] = None
1315 """The subset of axes to normalize jointly, i.e. axes to reduce to compute mean/std.
1316 For example to normalize 'batch', 'x' and 'y' jointly in a tensor ('batch', 'channel', 'y', 'x')
1317 resulting in a tensor of equal shape normalized per channel, specify `axes=('batch', 'x', 'y')`.
1318 To normalize each sample independently leave out the 'batch' axis.
1319 Default: Scale all axes jointly."""
1321 eps: Annotated[float, Interval(gt=0, le=0.1)] = 1e-6
1322 """epsilon for numeric stability: `out = (tensor - mean) / (std + eps)`."""
1325class ZeroMeanUnitVarianceDescr(ProcessingDescrBase):
1326 """Subtract mean and divide by variance.
1328 Examples:
1329 Subtract tensor mean and variance
1330 - in YAML
1331 ```yaml
1332 preprocessing:
1333 - id: zero_mean_unit_variance
1334 ```
1335 - in Python
1336 >>> preprocessing = [ZeroMeanUnitVarianceDescr()]
1337 """
1339 implemented_id: ClassVar[Literal["zero_mean_unit_variance"]] = (
1340 "zero_mean_unit_variance"
1341 )
1342 if TYPE_CHECKING:
1343 id: Literal["zero_mean_unit_variance"] = "zero_mean_unit_variance"
1344 else:
1345 id: Literal["zero_mean_unit_variance"]
1347 kwargs: ZeroMeanUnitVarianceKwargs = Field(
1348 default_factory=ZeroMeanUnitVarianceKwargs.model_construct
1349 )
1352class ScaleRangeKwargs(ProcessingKwargs):
1353 """key word arguments for `ScaleRangeDescr`
1355 For `min_percentile`=0.0 (the default) and `max_percentile`=100 (the default)
1356 this processing step normalizes data to the [0, 1] intervall.
1357 For other percentiles the normalized values will partially be outside the [0, 1]
1358 intervall. Use `ScaleRange` followed by `ClipDescr` if you want to limit the
1359 normalized values to a range.
1360 """
1362 axes: Annotated[
1363 Optional[Sequence[AxisId]], Field(examples=[("batch", "x", "y")])
1364 ] = None
1365 """The subset of axes to normalize jointly, i.e. axes to reduce to compute the min/max percentile value.
1366 For example to normalize 'batch', 'x' and 'y' jointly in a tensor ('batch', 'channel', 'y', 'x')
1367 resulting in a tensor of equal shape normalized per channel, specify `axes=('batch', 'x', 'y')`.
1368 To normalize samples independently, leave out the "batch" axis.
1369 Default: Scale all axes jointly."""
1371 min_percentile: Annotated[float, Interval(ge=0, lt=100)] = 0.0
1372 """The lower percentile used to determine the value to align with zero."""
1374 max_percentile: Annotated[float, Interval(gt=1, le=100)] = 100.0
1375 """The upper percentile used to determine the value to align with one.
1376 Has to be bigger than `min_percentile`.
1377 The range is 1 to 100 instead of 0 to 100 to avoid mistakenly
1378 accepting percentiles specified in the range 0.0 to 1.0."""
1380 eps: Annotated[float, Interval(gt=0, le=0.1)] = 1e-6
1381 """Epsilon for numeric stability.
1382 `out = (tensor - v_lower) / (v_upper - v_lower + eps)`;
1383 with `v_lower,v_upper` values at the respective percentiles."""
1385 reference_tensor: Optional[TensorId] = None
1386 """Tensor ID to compute the percentiles from. Default: The tensor itself.
1387 For any tensor in `inputs` only input tensor references are allowed."""
1389 @field_validator("max_percentile", mode="after")
1390 @classmethod
1391 def min_smaller_max(cls, value: float, info: ValidationInfo) -> float:
1392 if (min_p := info.data["min_percentile"]) >= value:
1393 raise ValueError(f"min_percentile {min_p} >= max_percentile {value}")
1395 return value
1398class ScaleRangeDescr(ProcessingDescrBase):
1399 """Scale with percentiles.
1401 Examples:
1402 1. Scale linearly to map 5th percentile to 0 and 99.8th percentile to 1.0
1403 - in YAML
1404 ```yaml
1405 preprocessing:
1406 - id: scale_range
1407 kwargs:
1408 axes: ['y', 'x']
1409 max_percentile: 99.8
1410 min_percentile: 5.0
1411 ```
1412 - in Python
1413 >>> preprocessing = [
1414 ... ScaleRangeDescr(
1415 ... kwargs=ScaleRangeKwargs(
1416 ... axes= (AxisId('y'), AxisId('x')),
1417 ... max_percentile= 99.8,
1418 ... min_percentile= 5.0,
1419 ... )
1420 ... ),
1421 ... ClipDescr(
1422 ... kwargs=ClipKwargs(
1423 ... min=0.0,
1424 ... max=1.0,
1425 ... )
1426 ... ),
1427 ... ]
1429 2. Combine the above scaling with additional clipping to clip values outside the range given by the percentiles.
1430 - in YAML
1431 ```yaml
1432 preprocessing:
1433 - id: scale_range
1434 kwargs:
1435 axes: ['y', 'x']
1436 max_percentile: 99.8
1437 min_percentile: 5.0
1438 - id: scale_range
1439 - id: clip
1440 kwargs:
1441 min: 0.0
1442 max: 1.0
1443 ```
1444 - in Python
1445 >>> preprocessing = [ScaleRangeDescr(
1446 ... kwargs=ScaleRangeKwargs(
1447 ... axes= (AxisId('y'), AxisId('x')),
1448 ... max_percentile= 99.8,
1449 ... min_percentile= 5.0,
1450 ... )
1451 ... )]
1453 """
1455 implemented_id: ClassVar[Literal["scale_range"]] = "scale_range"
1456 if TYPE_CHECKING:
1457 id: Literal["scale_range"] = "scale_range"
1458 else:
1459 id: Literal["scale_range"]
1460 kwargs: ScaleRangeKwargs = Field(default_factory=ScaleRangeKwargs.model_construct)
1463class ScaleMeanVarianceKwargs(ProcessingKwargs):
1464 """key word arguments for `ScaleMeanVarianceKwargs`"""
1466 reference_tensor: TensorId
1467 """Name of tensor to match."""
1469 axes: Annotated[
1470 Optional[Sequence[AxisId]], Field(examples=[("batch", "x", "y")])
1471 ] = None
1472 """The subset of axes to normalize jointly, i.e. axes to reduce to compute mean/std.
1473 For example to normalize 'batch', 'x' and 'y' jointly in a tensor ('batch', 'channel', 'y', 'x')
1474 resulting in a tensor of equal shape normalized per channel, specify `axes=('batch', 'x', 'y')`.
1475 To normalize samples independently, leave out the 'batch' axis.
1476 Default: Scale all axes jointly."""
1478 eps: Annotated[float, Interval(gt=0, le=0.1)] = 1e-6
1479 """Epsilon for numeric stability:
1480 `out = (tensor - mean) / (std + eps) * (ref_std + eps) + ref_mean.`"""
1483class ScaleMeanVarianceDescr(ProcessingDescrBase):
1484 """Scale a tensor's data distribution to match another tensor's mean/std.
1485 `out = (tensor - mean) / (std + eps) * (ref_std + eps) + ref_mean.`
1486 """
1488 implemented_id: ClassVar[Literal["scale_mean_variance"]] = "scale_mean_variance"
1489 if TYPE_CHECKING:
1490 id: Literal["scale_mean_variance"] = "scale_mean_variance"
1491 else:
1492 id: Literal["scale_mean_variance"]
1493 kwargs: ScaleMeanVarianceKwargs
1496PreprocessingDescr = Annotated[
1497 Union[
1498 BinarizeDescr,
1499 ClipDescr,
1500 EnsureDtypeDescr,
1501 FixedZeroMeanUnitVarianceDescr,
1502 ScaleLinearDescr,
1503 ScaleRangeDescr,
1504 SigmoidDescr,
1505 SoftmaxDescr,
1506 ZeroMeanUnitVarianceDescr,
1507 ],
1508 Discriminator("id"),
1509]
1510PostprocessingDescr = Annotated[
1511 Union[
1512 BinarizeDescr,
1513 ClipDescr,
1514 EnsureDtypeDescr,
1515 FixedZeroMeanUnitVarianceDescr,
1516 ScaleLinearDescr,
1517 ScaleMeanVarianceDescr,
1518 ScaleRangeDescr,
1519 SigmoidDescr,
1520 SoftmaxDescr,
1521 ZeroMeanUnitVarianceDescr,
1522 ],
1523 Discriminator("id"),
1524]
1526IO_AxisT = TypeVar("IO_AxisT", InputAxis, OutputAxis)
1529class TensorDescrBase(Node, Generic[IO_AxisT]):
1530 id: TensorId
1531 """Tensor id. No duplicates are allowed."""
1533 description: Annotated[str, MaxLen(128)] = ""
1534 """free text description"""
1536 axes: NotEmpty[Sequence[IO_AxisT]]
1537 """tensor axes"""
1539 @property
1540 def shape(self):
1541 return tuple(a.size for a in self.axes)
1543 @field_validator("axes", mode="after", check_fields=False)
1544 @classmethod
1545 def _validate_axes(cls, axes: Sequence[AnyAxis]) -> Sequence[AnyAxis]:
1546 batch_axes = [a for a in axes if a.type == "batch"]
1547 if len(batch_axes) > 1:
1548 raise ValueError(
1549 f"Only one batch axis (per tensor) allowed, but got {batch_axes}"
1550 )
1552 seen_ids: Set[AxisId] = set()
1553 duplicate_axes_ids: Set[AxisId] = set()
1554 for a in axes:
1555 (duplicate_axes_ids if a.id in seen_ids else seen_ids).add(a.id)
1557 if duplicate_axes_ids:
1558 raise ValueError(f"Duplicate axis ids: {duplicate_axes_ids}")
1560 return axes
1562 test_tensor: FAIR[Optional[FileDescr_]] = None
1563 """An example tensor to use for testing.
1564 Using the model with the test input tensors is expected to yield the test output tensors.
1565 Each test tensor has be a an ndarray in the
1566 [numpy.lib file format](https://numpy.org/doc/stable/reference/generated/numpy.lib.format.html#module-numpy.lib.format).
1567 The file extension must be '.npy'."""
1569 sample_tensor: FAIR[Optional[FileDescr_]] = None
1570 """A sample tensor to illustrate a possible input/output for the model,
1571 The sample image primarily serves to inform a human user about an example use case
1572 and is typically stored as .hdf5, .png or .tiff.
1573 It has to be readable by the [imageio library](https://imageio.readthedocs.io/en/stable/formats/index.html#supported-formats)
1574 (numpy's `.npy` format is not supported).
1575 The image dimensionality has to match the number of axes specified in this tensor description.
1576 """
1578 @model_validator(mode="after")
1579 def _validate_sample_tensor(self) -> Self:
1580 if self.sample_tensor is None or not get_validation_context().perform_io_checks:
1581 return self
1583 reader = get_reader(self.sample_tensor.source, sha256=self.sample_tensor.sha256)
1584 tensor: NDArray[Any] = imread( # pyright: ignore[reportUnknownVariableType]
1585 reader.read(),
1586 extension=PurePosixPath(reader.original_file_name).suffix,
1587 )
1588 n_dims = len(tensor.squeeze().shape)
1589 n_dims_min = n_dims_max = len(self.axes)
1591 for a in self.axes:
1592 if isinstance(a, BatchAxis):
1593 n_dims_min -= 1
1594 elif isinstance(a.size, int):
1595 if a.size == 1:
1596 n_dims_min -= 1
1597 elif isinstance(a.size, (ParameterizedSize, DataDependentSize)):
1598 if a.size.min == 1:
1599 n_dims_min -= 1
1600 elif isinstance(a.size, SizeReference):
1601 if a.size.offset < 2:
1602 # size reference may result in singleton axis
1603 n_dims_min -= 1
1604 else:
1605 assert_never(a.size)
1607 n_dims_min = max(0, n_dims_min)
1608 if n_dims < n_dims_min or n_dims > n_dims_max:
1609 raise ValueError(
1610 f"Expected sample tensor to have {n_dims_min} to"
1611 + f" {n_dims_max} dimensions, but found {n_dims} (shape: {tensor.shape})."
1612 )
1614 return self
1616 data: Union[TensorDataDescr, NotEmpty[Sequence[TensorDataDescr]]] = (
1617 IntervalOrRatioDataDescr()
1618 )
1619 """Description of the tensor's data values, optionally per channel.
1620 If specified per channel, the data `type` needs to match across channels."""
1622 @property
1623 def dtype(
1624 self,
1625 ) -> Literal[
1626 "float32",
1627 "float64",
1628 "uint8",
1629 "int8",
1630 "uint16",
1631 "int16",
1632 "uint32",
1633 "int32",
1634 "uint64",
1635 "int64",
1636 "bool",
1637 ]:
1638 """dtype as specified under `data.type` or `data[i].type`"""
1639 if isinstance(self.data, collections.abc.Sequence):
1640 return self.data[0].type
1641 else:
1642 return self.data.type
1644 @field_validator("data", mode="after")
1645 @classmethod
1646 def _check_data_type_across_channels(
1647 cls, value: Union[TensorDataDescr, NotEmpty[Sequence[TensorDataDescr]]]
1648 ) -> Union[TensorDataDescr, NotEmpty[Sequence[TensorDataDescr]]]:
1649 if not isinstance(value, list):
1650 return value
1652 dtypes = {t.type for t in value}
1653 if len(dtypes) > 1:
1654 raise ValueError(
1655 "Tensor data descriptions per channel need to agree in their data"
1656 + f" `type`, but found {dtypes}."
1657 )
1659 return value
1661 @model_validator(mode="after")
1662 def _check_data_matches_channelaxis(self) -> Self:
1663 if not isinstance(self.data, (list, tuple)):
1664 return self
1666 for a in self.axes:
1667 if isinstance(a, ChannelAxis):
1668 size = a.size
1669 assert isinstance(size, int)
1670 break
1671 else:
1672 return self
1674 if len(self.data) != size:
1675 raise ValueError(
1676 f"Got tensor data descriptions for {len(self.data)} channels, but"
1677 + f" '{a.id}' axis has size {size}."
1678 )
1680 return self
1682 def get_axis_sizes_for_array(self, array: NDArray[Any]) -> Dict[AxisId, int]:
1683 if len(array.shape) != len(self.axes):
1684 raise ValueError(
1685 f"Dimension mismatch: array shape {array.shape} (#{len(array.shape)})"
1686 + f" incompatible with {len(self.axes)} axes."
1687 )
1688 return {a.id: array.shape[i] for i, a in enumerate(self.axes)}
1691class InputTensorDescr(TensorDescrBase[InputAxis]):
1692 id: TensorId = TensorId("input")
1693 """Input tensor id.
1694 No duplicates are allowed across all inputs and outputs."""
1696 optional: bool = False
1697 """indicates that this tensor may be `None`"""
1699 preprocessing: List[PreprocessingDescr] = Field(
1700 default_factory=cast(Callable[[], List[PreprocessingDescr]], list)
1701 )
1703 """Description of how this input should be preprocessed.
1705 notes:
1706 - If preprocessing does not start with an 'ensure_dtype' entry, it is added
1707 to ensure an input tensor's data type matches the input tensor's data description.
1708 - If preprocessing does not end with an 'ensure_dtype' or 'binarize' entry, an
1709 'ensure_dtype' step is added to ensure preprocessing steps are not unintentionally
1710 changing the data type.
1711 """
1713 @model_validator(mode="after")
1714 def _validate_preprocessing_kwargs(self) -> Self:
1715 axes_ids = [a.id for a in self.axes]
1716 for p in self.preprocessing:
1717 kwargs_axes: Optional[Sequence[Any]] = p.kwargs.get("axes")
1718 if kwargs_axes is None:
1719 continue
1721 if not isinstance(kwargs_axes, collections.abc.Sequence):
1722 raise ValueError(
1723 f"Expected `preprocessing.i.kwargs.axes` to be a sequence, but got {type(kwargs_axes)}"
1724 )
1726 if any(a not in axes_ids for a in kwargs_axes):
1727 raise ValueError(
1728 "`preprocessing.i.kwargs.axes` needs to be subset of axes ids"
1729 )
1731 if isinstance(self.data, (NominalOrOrdinalDataDescr, IntervalOrRatioDataDescr)):
1732 dtype = self.data.type
1733 else:
1734 dtype = self.data[0].type
1736 # ensure `preprocessing` begins with `EnsureDtypeDescr`
1737 if not self.preprocessing or not isinstance(
1738 self.preprocessing[0], EnsureDtypeDescr
1739 ):
1740 self.preprocessing.insert(
1741 0, EnsureDtypeDescr(kwargs=EnsureDtypeKwargs(dtype=dtype))
1742 )
1744 # ensure `preprocessing` ends with `EnsureDtypeDescr` or `BinarizeDescr`
1745 if not isinstance(self.preprocessing[-1], (EnsureDtypeDescr, BinarizeDescr)):
1746 self.preprocessing.append(
1747 EnsureDtypeDescr(kwargs=EnsureDtypeKwargs(dtype=dtype))
1748 )
1750 return self
1753def convert_axes(
1754 axes: str,
1755 *,
1756 shape: Union[
1757 Sequence[int], _ParameterizedInputShape_v0_4, _ImplicitOutputShape_v0_4
1758 ],
1759 tensor_type: Literal["input", "output"],
1760 halo: Optional[Sequence[int]],
1761 size_refs: Mapping[_TensorName_v0_4, Mapping[str, int]],
1762):
1763 ret: List[AnyAxis] = []
1764 for i, a in enumerate(axes):
1765 axis_type = _AXIS_TYPE_MAP.get(a, a)
1766 if axis_type == "batch":
1767 ret.append(BatchAxis())
1768 continue
1770 scale = 1.0
1771 if isinstance(shape, _ParameterizedInputShape_v0_4):
1772 if shape.step[i] == 0:
1773 size = shape.min[i]
1774 else:
1775 size = ParameterizedSize(min=shape.min[i], step=shape.step[i])
1776 elif isinstance(shape, _ImplicitOutputShape_v0_4):
1777 ref_t = str(shape.reference_tensor)
1778 if ref_t.count(".") == 1:
1779 t_id, orig_a_id = ref_t.split(".")
1780 else:
1781 t_id = ref_t
1782 orig_a_id = a
1784 a_id = _AXIS_ID_MAP.get(orig_a_id, a)
1785 if not (orig_scale := shape.scale[i]):
1786 # old way to insert a new axis dimension
1787 size = int(2 * shape.offset[i])
1788 else:
1789 scale = 1 / orig_scale
1790 if axis_type in ("channel", "index"):
1791 # these axes no longer have a scale
1792 offset_from_scale = orig_scale * size_refs.get(
1793 _TensorName_v0_4(t_id), {}
1794 ).get(orig_a_id, 0)
1795 else:
1796 offset_from_scale = 0
1797 size = SizeReference(
1798 tensor_id=TensorId(t_id),
1799 axis_id=AxisId(a_id),
1800 offset=int(offset_from_scale + 2 * shape.offset[i]),
1801 )
1802 else:
1803 size = shape[i]
1805 if axis_type == "time":
1806 if tensor_type == "input":
1807 ret.append(TimeInputAxis(size=size, scale=scale))
1808 else:
1809 assert not isinstance(size, ParameterizedSize)
1810 if halo is None:
1811 ret.append(TimeOutputAxis(size=size, scale=scale))
1812 else:
1813 assert not isinstance(size, int)
1814 ret.append(
1815 TimeOutputAxisWithHalo(size=size, scale=scale, halo=halo[i])
1816 )
1818 elif axis_type == "index":
1819 if tensor_type == "input":
1820 ret.append(IndexInputAxis(size=size))
1821 else:
1822 if isinstance(size, ParameterizedSize):
1823 size = DataDependentSize(min=size.min)
1825 ret.append(IndexOutputAxis(size=size))
1826 elif axis_type == "channel":
1827 assert not isinstance(size, ParameterizedSize)
1828 if isinstance(size, SizeReference):
1829 warnings.warn(
1830 "Conversion of channel size from an implicit output shape may be"
1831 + " wrong"
1832 )
1833 ret.append(
1834 ChannelAxis(
1835 channel_names=[
1836 Identifier(f"channel{i}") for i in range(size.offset)
1837 ]
1838 )
1839 )
1840 else:
1841 ret.append(
1842 ChannelAxis(
1843 channel_names=[Identifier(f"channel{i}") for i in range(size)]
1844 )
1845 )
1846 elif axis_type == "space":
1847 if tensor_type == "input":
1848 ret.append(SpaceInputAxis(id=AxisId(a), size=size, scale=scale))
1849 else:
1850 assert not isinstance(size, ParameterizedSize)
1851 if halo is None or halo[i] == 0:
1852 ret.append(SpaceOutputAxis(id=AxisId(a), size=size, scale=scale))
1853 elif isinstance(size, int):
1854 raise NotImplementedError(
1855 f"output axis with halo and fixed size (here {size}) not allowed"
1856 )
1857 else:
1858 ret.append(
1859 SpaceOutputAxisWithHalo(
1860 id=AxisId(a), size=size, scale=scale, halo=halo[i]
1861 )
1862 )
1864 return ret
1867def _axes_letters_to_ids(
1868 axes: Optional[str],
1869) -> Optional[List[AxisId]]:
1870 if axes is None:
1871 return None
1873 return [AxisId(a) for a in axes]
1876def _get_complement_v04_axis(
1877 tensor_axes: Sequence[str], axes: Optional[Sequence[str]]
1878) -> Optional[AxisId]:
1879 if axes is None:
1880 return None
1882 non_complement_axes = set(axes) | {"b"}
1883 complement_axes = [a for a in tensor_axes if a not in non_complement_axes]
1884 if len(complement_axes) > 1:
1885 raise ValueError(
1886 f"Expected none or a single complement axis, but axes '{axes}' "
1887 + f"for tensor dims '{tensor_axes}' leave '{complement_axes}'."
1888 )
1890 return None if not complement_axes else AxisId(complement_axes[0])
1893def _convert_proc(
1894 p: Union[_PreprocessingDescr_v0_4, _PostprocessingDescr_v0_4],
1895 tensor_axes: Sequence[str],
1896) -> Union[PreprocessingDescr, PostprocessingDescr]:
1897 if isinstance(p, _BinarizeDescr_v0_4):
1898 return BinarizeDescr(kwargs=BinarizeKwargs(threshold=p.kwargs.threshold))
1899 elif isinstance(p, _ClipDescr_v0_4):
1900 return ClipDescr(kwargs=ClipKwargs(min=p.kwargs.min, max=p.kwargs.max))
1901 elif isinstance(p, _SigmoidDescr_v0_4):
1902 return SigmoidDescr()
1903 elif isinstance(p, _ScaleLinearDescr_v0_4):
1904 axes = _axes_letters_to_ids(p.kwargs.axes)
1905 if p.kwargs.axes is None:
1906 axis = None
1907 else:
1908 axis = _get_complement_v04_axis(tensor_axes, p.kwargs.axes)
1910 if axis is None:
1911 assert not isinstance(p.kwargs.gain, list)
1912 assert not isinstance(p.kwargs.offset, list)
1913 kwargs = ScaleLinearKwargs(gain=p.kwargs.gain, offset=p.kwargs.offset)
1914 else:
1915 kwargs = ScaleLinearAlongAxisKwargs(
1916 axis=axis, gain=p.kwargs.gain, offset=p.kwargs.offset
1917 )
1918 return ScaleLinearDescr(kwargs=kwargs)
1919 elif isinstance(p, _ScaleMeanVarianceDescr_v0_4):
1920 return ScaleMeanVarianceDescr(
1921 kwargs=ScaleMeanVarianceKwargs(
1922 axes=_axes_letters_to_ids(p.kwargs.axes),
1923 reference_tensor=TensorId(str(p.kwargs.reference_tensor)),
1924 eps=p.kwargs.eps,
1925 )
1926 )
1927 elif isinstance(p, _ZeroMeanUnitVarianceDescr_v0_4):
1928 if p.kwargs.mode == "fixed":
1929 mean = p.kwargs.mean
1930 std = p.kwargs.std
1931 assert mean is not None
1932 assert std is not None
1934 axis = _get_complement_v04_axis(tensor_axes, p.kwargs.axes)
1936 if axis is None:
1937 if isinstance(mean, list):
1938 raise ValueError("Expected single float value for mean, not <list>")
1939 if isinstance(std, list):
1940 raise ValueError("Expected single float value for std, not <list>")
1941 return FixedZeroMeanUnitVarianceDescr(
1942 kwargs=FixedZeroMeanUnitVarianceKwargs.model_construct(
1943 mean=mean,
1944 std=std,
1945 )
1946 )
1947 else:
1948 if not isinstance(mean, list):
1949 mean = [float(mean)]
1950 if not isinstance(std, list):
1951 std = [float(std)]
1953 return FixedZeroMeanUnitVarianceDescr(
1954 kwargs=FixedZeroMeanUnitVarianceAlongAxisKwargs(
1955 axis=axis, mean=mean, std=std
1956 )
1957 )
1959 else:
1960 axes = _axes_letters_to_ids(p.kwargs.axes) or []
1961 if p.kwargs.mode == "per_dataset":
1962 axes = [AxisId("batch")] + axes
1963 if not axes:
1964 axes = None
1965 return ZeroMeanUnitVarianceDescr(
1966 kwargs=ZeroMeanUnitVarianceKwargs(axes=axes, eps=p.kwargs.eps)
1967 )
1969 elif isinstance(p, _ScaleRangeDescr_v0_4):
1970 return ScaleRangeDescr(
1971 kwargs=ScaleRangeKwargs(
1972 axes=_axes_letters_to_ids(p.kwargs.axes),
1973 min_percentile=p.kwargs.min_percentile,
1974 max_percentile=p.kwargs.max_percentile,
1975 eps=p.kwargs.eps,
1976 )
1977 )
1978 else:
1979 assert_never(p)
1982class _InputTensorConv(
1983 Converter[
1984 _InputTensorDescr_v0_4,
1985 InputTensorDescr,
1986 FileSource_,
1987 Optional[FileSource_],
1988 Mapping[_TensorName_v0_4, Mapping[str, int]],
1989 ]
1990):
1991 def _convert(
1992 self,
1993 src: _InputTensorDescr_v0_4,
1994 tgt: "type[InputTensorDescr] | type[dict[str, Any]]",
1995 test_tensor: FileSource_,
1996 sample_tensor: Optional[FileSource_],
1997 size_refs: Mapping[_TensorName_v0_4, Mapping[str, int]],
1998 ) -> "InputTensorDescr | dict[str, Any]":
1999 axes: List[InputAxis] = convert_axes( # pyright: ignore[reportAssignmentType]
2000 src.axes,
2001 shape=src.shape,
2002 tensor_type="input",
2003 halo=None,
2004 size_refs=size_refs,
2005 )
2006 prep: List[PreprocessingDescr] = []
2007 for p in src.preprocessing:
2008 cp = _convert_proc(p, src.axes)
2009 assert not isinstance(cp, ScaleMeanVarianceDescr)
2010 prep.append(cp)
2012 prep.append(EnsureDtypeDescr(kwargs=EnsureDtypeKwargs(dtype="float32")))
2014 return tgt(
2015 axes=axes,
2016 id=TensorId(str(src.name)),
2017 test_tensor=FileDescr(source=test_tensor),
2018 sample_tensor=(
2019 None if sample_tensor is None else FileDescr(source=sample_tensor)
2020 ),
2021 data=dict(type=src.data_type), # pyright: ignore[reportArgumentType]
2022 preprocessing=prep,
2023 )
2026_input_tensor_conv = _InputTensorConv(_InputTensorDescr_v0_4, InputTensorDescr)
2029class OutputTensorDescr(TensorDescrBase[OutputAxis]):
2030 id: TensorId = TensorId("output")
2031 """Output tensor id.
2032 No duplicates are allowed across all inputs and outputs."""
2034 postprocessing: List[PostprocessingDescr] = Field(
2035 default_factory=cast(Callable[[], List[PostprocessingDescr]], list)
2036 )
2037 """Description of how this output should be postprocessed.
2039 note: `postprocessing` always ends with an 'ensure_dtype' operation.
2040 If not given this is added to cast to this tensor's `data.type`.
2041 """
2043 @model_validator(mode="after")
2044 def _validate_postprocessing_kwargs(self) -> Self:
2045 axes_ids = [a.id for a in self.axes]
2046 for p in self.postprocessing:
2047 kwargs_axes: Optional[Sequence[Any]] = p.kwargs.get("axes")
2048 if kwargs_axes is None:
2049 continue
2051 if not isinstance(kwargs_axes, collections.abc.Sequence):
2052 raise ValueError(
2053 f"expected `axes` sequence, but got {type(kwargs_axes)}"
2054 )
2056 if any(a not in axes_ids for a in kwargs_axes):
2057 raise ValueError("`kwargs.axes` needs to be subset of axes ids")
2059 if isinstance(self.data, (NominalOrOrdinalDataDescr, IntervalOrRatioDataDescr)):
2060 dtype = self.data.type
2061 else:
2062 dtype = self.data[0].type
2064 # ensure `postprocessing` ends with `EnsureDtypeDescr` or `BinarizeDescr`
2065 if not self.postprocessing or not isinstance(
2066 self.postprocessing[-1], (EnsureDtypeDescr, BinarizeDescr)
2067 ):
2068 self.postprocessing.append(
2069 EnsureDtypeDescr(kwargs=EnsureDtypeKwargs(dtype=dtype))
2070 )
2071 return self
2074class _OutputTensorConv(
2075 Converter[
2076 _OutputTensorDescr_v0_4,
2077 OutputTensorDescr,
2078 FileSource_,
2079 Optional[FileSource_],
2080 Mapping[_TensorName_v0_4, Mapping[str, int]],
2081 ]
2082):
2083 def _convert(
2084 self,
2085 src: _OutputTensorDescr_v0_4,
2086 tgt: "type[OutputTensorDescr] | type[dict[str, Any]]",
2087 test_tensor: FileSource_,
2088 sample_tensor: Optional[FileSource_],
2089 size_refs: Mapping[_TensorName_v0_4, Mapping[str, int]],
2090 ) -> "OutputTensorDescr | dict[str, Any]":
2091 # TODO: split convert_axes into convert_output_axes and convert_input_axes
2092 axes: List[OutputAxis] = convert_axes( # pyright: ignore[reportAssignmentType]
2093 src.axes,
2094 shape=src.shape,
2095 tensor_type="output",
2096 halo=src.halo,
2097 size_refs=size_refs,
2098 )
2099 data_descr: Dict[str, Any] = dict(type=src.data_type)
2100 if data_descr["type"] == "bool":
2101 data_descr["values"] = [False, True]
2103 return tgt(
2104 axes=axes,
2105 id=TensorId(str(src.name)),
2106 test_tensor=FileDescr(source=test_tensor),
2107 sample_tensor=(
2108 None if sample_tensor is None else FileDescr(source=sample_tensor)
2109 ),
2110 data=data_descr, # pyright: ignore[reportArgumentType]
2111 postprocessing=[_convert_proc(p, src.axes) for p in src.postprocessing],
2112 )
2115_output_tensor_conv = _OutputTensorConv(_OutputTensorDescr_v0_4, OutputTensorDescr)
2118TensorDescr = Union[InputTensorDescr, OutputTensorDescr]
2121def validate_tensors(
2122 tensors: Mapping[TensorId, Tuple[TensorDescr, Optional[NDArray[Any]]]],
2123 tensor_origin: Literal[
2124 "test_tensor"
2125 ], # for more precise error messages, e.g. 'test_tensor'
2126):
2127 all_tensor_axes: Dict[TensorId, Dict[AxisId, Tuple[AnyAxis, Optional[int]]]] = {}
2129 def e_msg(d: TensorDescr):
2130 return f"{'inputs' if isinstance(d, InputTensorDescr) else 'outputs'}[{d.id}]"
2132 for descr, array in tensors.values():
2133 if array is None:
2134 axis_sizes = {a.id: None for a in descr.axes}
2135 else:
2136 try:
2137 axis_sizes = descr.get_axis_sizes_for_array(array)
2138 except ValueError as e:
2139 raise ValueError(f"{e_msg(descr)} {e}")
2141 all_tensor_axes[descr.id] = {a.id: (a, axis_sizes[a.id]) for a in descr.axes}
2143 for descr, array in tensors.values():
2144 if array is None:
2145 continue
2147 if descr.dtype in ("float32", "float64"):
2148 invalid_test_tensor_dtype = array.dtype.name not in (
2149 "float32",
2150 "float64",
2151 "uint8",
2152 "int8",
2153 "uint16",
2154 "int16",
2155 "uint32",
2156 "int32",
2157 "uint64",
2158 "int64",
2159 )
2160 else:
2161 invalid_test_tensor_dtype = array.dtype.name != descr.dtype
2163 if invalid_test_tensor_dtype:
2164 raise ValueError(
2165 f"{e_msg(descr)}.{tensor_origin}.dtype '{array.dtype.name}' does not"
2166 + f" match described dtype '{descr.dtype}'"
2167 )
2169 if array.min() > -1e-4 and array.max() < 1e-4:
2170 raise ValueError(
2171 "Output values are too small for reliable testing."
2172 + f" Values <-1e5 or >=1e5 must be present in {tensor_origin}"
2173 )
2175 for a in descr.axes:
2176 actual_size = all_tensor_axes[descr.id][a.id][1]
2177 if actual_size is None:
2178 continue
2180 if a.size is None:
2181 continue
2183 if isinstance(a.size, int):
2184 if actual_size != a.size:
2185 raise ValueError(
2186 f"{e_msg(descr)}.{tensor_origin}: axis '{a.id}' "
2187 + f"has incompatible size {actual_size}, expected {a.size}"
2188 )
2189 elif isinstance(a.size, ParameterizedSize):
2190 _ = a.size.validate_size(actual_size)
2191 elif isinstance(a.size, DataDependentSize):
2192 _ = a.size.validate_size(actual_size)
2193 elif isinstance(a.size, SizeReference):
2194 ref_tensor_axes = all_tensor_axes.get(a.size.tensor_id)
2195 if ref_tensor_axes is None:
2196 raise ValueError(
2197 f"{e_msg(descr)}.axes[{a.id}].size.tensor_id: Unknown tensor"
2198 + f" reference '{a.size.tensor_id}'"
2199 )
2201 ref_axis, ref_size = ref_tensor_axes.get(a.size.axis_id, (None, None))
2202 if ref_axis is None or ref_size is None:
2203 raise ValueError(
2204 f"{e_msg(descr)}.axes[{a.id}].size.axis_id: Unknown tensor axis"
2205 + f" reference '{a.size.tensor_id}.{a.size.axis_id}"
2206 )
2208 if a.unit != ref_axis.unit:
2209 raise ValueError(
2210 f"{e_msg(descr)}.axes[{a.id}].size: `SizeReference` requires"
2211 + " axis and reference axis to have the same `unit`, but"
2212 + f" {a.unit}!={ref_axis.unit}"
2213 )
2215 if actual_size != (
2216 expected_size := (
2217 ref_size * ref_axis.scale / a.scale + a.size.offset
2218 )
2219 ):
2220 raise ValueError(
2221 f"{e_msg(descr)}.{tensor_origin}: axis '{a.id}' of size"
2222 + f" {actual_size} invalid for referenced size {ref_size};"
2223 + f" expected {expected_size}"
2224 )
2225 else:
2226 assert_never(a.size)
2229FileDescr_dependencies = Annotated[
2230 FileDescr_,
2231 WithSuffix((".yaml", ".yml"), case_sensitive=True),
2232 Field(examples=[dict(source="environment.yaml")]),
2233]
2236class _ArchitectureCallableDescr(Node):
2237 callable: Annotated[Identifier, Field(examples=["MyNetworkClass", "get_my_model"])]
2238 """Identifier of the callable that returns a torch.nn.Module instance."""
2240 kwargs: Dict[str, YamlValue] = Field(
2241 default_factory=cast(Callable[[], Dict[str, YamlValue]], dict)
2242 )
2243 """key word arguments for the `callable`"""
2246class ArchitectureFromFileDescr(_ArchitectureCallableDescr, FileDescr):
2247 source: Annotated[FileSource, AfterValidator(wo_special_file_name)]
2248 """Architecture source file"""
2250 @model_serializer(mode="wrap", when_used="unless-none")
2251 def _serialize(self, nxt: SerializerFunctionWrapHandler, info: SerializationInfo):
2252 return package_file_descr_serializer(self, nxt, info)
2255class ArchitectureFromLibraryDescr(_ArchitectureCallableDescr):
2256 import_from: str
2257 """Where to import the callable from, i.e. `from <import_from> import <callable>`"""
2260class _ArchFileConv(
2261 Converter[
2262 _CallableFromFile_v0_4,
2263 ArchitectureFromFileDescr,
2264 Optional[Sha256],
2265 Dict[str, Any],
2266 ]
2267):
2268 def _convert(
2269 self,
2270 src: _CallableFromFile_v0_4,
2271 tgt: "type[ArchitectureFromFileDescr | dict[str, Any]]",
2272 sha256: Optional[Sha256],
2273 kwargs: Dict[str, Any],
2274 ) -> "ArchitectureFromFileDescr | dict[str, Any]":
2275 if src.startswith("http") and src.count(":") == 2:
2276 http, source, callable_ = src.split(":")
2277 source = ":".join((http, source))
2278 elif not src.startswith("http") and src.count(":") == 1:
2279 source, callable_ = src.split(":")
2280 else:
2281 source = str(src)
2282 callable_ = str(src)
2283 return tgt(
2284 callable=Identifier(callable_),
2285 source=cast(FileSource_, source),
2286 sha256=sha256,
2287 kwargs=kwargs,
2288 )
2291_arch_file_conv = _ArchFileConv(_CallableFromFile_v0_4, ArchitectureFromFileDescr)
2294class _ArchLibConv(
2295 Converter[
2296 _CallableFromDepencency_v0_4, ArchitectureFromLibraryDescr, Dict[str, Any]
2297 ]
2298):
2299 def _convert(
2300 self,
2301 src: _CallableFromDepencency_v0_4,
2302 tgt: "type[ArchitectureFromLibraryDescr | dict[str, Any]]",
2303 kwargs: Dict[str, Any],
2304 ) -> "ArchitectureFromLibraryDescr | dict[str, Any]":
2305 *mods, callable_ = src.split(".")
2306 import_from = ".".join(mods)
2307 return tgt(
2308 import_from=import_from, callable=Identifier(callable_), kwargs=kwargs
2309 )
2312_arch_lib_conv = _ArchLibConv(
2313 _CallableFromDepencency_v0_4, ArchitectureFromLibraryDescr
2314)
2317class WeightsEntryDescrBase(FileDescr):
2318 type: ClassVar[WeightsFormat]
2319 weights_format_name: ClassVar[str] # human readable
2321 source: Annotated[FileSource, AfterValidator(wo_special_file_name)]
2322 """Source of the weights file."""
2324 authors: Optional[List[Author]] = None
2325 """Authors
2326 Either the person(s) that have trained this model resulting in the original weights file.
2327 (If this is the initial weights entry, i.e. it does not have a `parent`)
2328 Or the person(s) who have converted the weights to this weights format.
2329 (If this is a child weight, i.e. it has a `parent` field)
2330 """
2332 parent: Annotated[
2333 Optional[WeightsFormat], Field(examples=["pytorch_state_dict"])
2334 ] = None
2335 """The source weights these weights were converted from.
2336 For example, if a model's weights were converted from the `pytorch_state_dict` format to `torchscript`,
2337 The `pytorch_state_dict` weights entry has no `parent` and is the parent of the `torchscript` weights.
2338 All weight entries except one (the initial set of weights resulting from training the model),
2339 need to have this field."""
2341 comment: str = ""
2342 """A comment about this weights entry, for example how these weights were created."""
2344 @model_validator(mode="after")
2345 def _validate(self) -> Self:
2346 if self.type == self.parent:
2347 raise ValueError("Weights entry can't be it's own parent.")
2349 return self
2351 @model_serializer(mode="wrap", when_used="unless-none")
2352 def _serialize(self, nxt: SerializerFunctionWrapHandler, info: SerializationInfo):
2353 return package_file_descr_serializer(self, nxt, info)
2356class KerasHdf5WeightsDescr(WeightsEntryDescrBase):
2357 type = "keras_hdf5"
2358 weights_format_name: ClassVar[str] = "Keras HDF5"
2359 tensorflow_version: Version
2360 """TensorFlow version used to create these weights."""
2363FileDescr_external_data = Annotated[
2364 FileDescr_,
2365 WithSuffix(".data", case_sensitive=True),
2366 Field(examples=[dict(source="weights.onnx.data")]),
2367]
2370class OnnxWeightsDescr(WeightsEntryDescrBase):
2371 type = "onnx"
2372 weights_format_name: ClassVar[str] = "ONNX"
2373 opset_version: Annotated[int, Ge(7)]
2374 """ONNX opset version"""
2376 external_data: Optional[FileDescr_external_data] = None
2377 """Source of the external ONNX data file holding the weights.
2378 (If present **source** holds the ONNX architecture without weights)."""
2380 @model_validator(mode="after")
2381 def _validate_external_data_unique_file_name(self) -> Self:
2382 if self.external_data is not None and (
2383 extract_file_name(self.source)
2384 == extract_file_name(self.external_data.source)
2385 ):
2386 raise ValueError(
2387 f"ONNX `external_data` file name '{extract_file_name(self.external_data.source)}'"
2388 + " must be different from ONNX `source` file name."
2389 )
2391 return self
2394class PytorchStateDictWeightsDescr(WeightsEntryDescrBase):
2395 type = "pytorch_state_dict"
2396 weights_format_name: ClassVar[str] = "Pytorch State Dict"
2397 architecture: Union[ArchitectureFromFileDescr, ArchitectureFromLibraryDescr]
2398 pytorch_version: Version
2399 """Version of the PyTorch library used.
2400 If `architecture.depencencies` is specified it has to include pytorch and any version pinning has to be compatible.
2401 """
2402 dependencies: Optional[FileDescr_dependencies] = None
2403 """Custom depencies beyond pytorch described in a Conda environment file.
2404 Allows to specify custom dependencies, see conda docs:
2405 - [Exporting an environment file across platforms](https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html#exporting-an-environment-file-across-platforms)
2406 - [Creating an environment file manually](https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html#creating-an-environment-file-manually)
2408 The conda environment file should include pytorch and any version pinning has to be compatible with
2409 **pytorch_version**.
2410 """
2413class TensorflowJsWeightsDescr(WeightsEntryDescrBase):
2414 type = "tensorflow_js"
2415 weights_format_name: ClassVar[str] = "Tensorflow.js"
2416 tensorflow_version: Version
2417 """Version of the TensorFlow library used."""
2419 source: Annotated[FileSource, AfterValidator(wo_special_file_name)]
2420 """The multi-file weights.
2421 All required files/folders should be a zip archive."""
2424class TensorflowSavedModelBundleWeightsDescr(WeightsEntryDescrBase):
2425 type = "tensorflow_saved_model_bundle"
2426 weights_format_name: ClassVar[str] = "Tensorflow Saved Model"
2427 tensorflow_version: Version
2428 """Version of the TensorFlow library used."""
2430 dependencies: Optional[FileDescr_dependencies] = None
2431 """Custom dependencies beyond tensorflow.
2432 Should include tensorflow and any version pinning has to be compatible with **tensorflow_version**."""
2434 source: Annotated[FileSource, AfterValidator(wo_special_file_name)]
2435 """The multi-file weights.
2436 All required files/folders should be a zip archive."""
2439class TorchscriptWeightsDescr(WeightsEntryDescrBase):
2440 type = "torchscript"
2441 weights_format_name: ClassVar[str] = "TorchScript"
2442 pytorch_version: Version
2443 """Version of the PyTorch library used."""
2446class WeightsDescr(Node):
2447 keras_hdf5: Optional[KerasHdf5WeightsDescr] = None
2448 onnx: Optional[OnnxWeightsDescr] = None
2449 pytorch_state_dict: Optional[PytorchStateDictWeightsDescr] = None
2450 tensorflow_js: Optional[TensorflowJsWeightsDescr] = None
2451 tensorflow_saved_model_bundle: Optional[TensorflowSavedModelBundleWeightsDescr] = (
2452 None
2453 )
2454 torchscript: Optional[TorchscriptWeightsDescr] = None
2456 @model_validator(mode="after")
2457 def check_entries(self) -> Self:
2458 entries = {wtype for wtype, entry in self if entry is not None}
2460 if not entries:
2461 raise ValueError("Missing weights entry")
2463 entries_wo_parent = {
2464 wtype
2465 for wtype, entry in self
2466 if entry is not None and hasattr(entry, "parent") and entry.parent is None
2467 }
2468 if len(entries_wo_parent) != 1:
2469 issue_warning(
2470 "Exactly one weights entry may not specify the `parent` field (got"
2471 + " {value}). That entry is considered the original set of model weights."
2472 + " Other weight formats are created through conversion of the orignal or"
2473 + " already converted weights. They have to reference the weights format"
2474 + " they were converted from as their `parent`.",
2475 value=len(entries_wo_parent),
2476 field="weights",
2477 )
2479 for wtype, entry in self:
2480 if entry is None:
2481 continue
2483 assert hasattr(entry, "type")
2484 assert hasattr(entry, "parent")
2485 assert wtype == entry.type
2486 if (
2487 entry.parent is not None and entry.parent not in entries
2488 ): # self reference checked for `parent` field
2489 raise ValueError(
2490 f"`weights.{wtype}.parent={entry.parent} not in specified weight"
2491 + f" formats: {entries}"
2492 )
2494 return self
2496 def __getitem__(
2497 self,
2498 key: Literal[
2499 "keras_hdf5",
2500 "onnx",
2501 "pytorch_state_dict",
2502 "tensorflow_js",
2503 "tensorflow_saved_model_bundle",
2504 "torchscript",
2505 ],
2506 ):
2507 if key == "keras_hdf5":
2508 ret = self.keras_hdf5
2509 elif key == "onnx":
2510 ret = self.onnx
2511 elif key == "pytorch_state_dict":
2512 ret = self.pytorch_state_dict
2513 elif key == "tensorflow_js":
2514 ret = self.tensorflow_js
2515 elif key == "tensorflow_saved_model_bundle":
2516 ret = self.tensorflow_saved_model_bundle
2517 elif key == "torchscript":
2518 ret = self.torchscript
2519 else:
2520 raise KeyError(key)
2522 if ret is None:
2523 raise KeyError(key)
2525 return ret
2527 @property
2528 def available_formats(self):
2529 return {
2530 **({} if self.keras_hdf5 is None else {"keras_hdf5": self.keras_hdf5}),
2531 **({} if self.onnx is None else {"onnx": self.onnx}),
2532 **(
2533 {}
2534 if self.pytorch_state_dict is None
2535 else {"pytorch_state_dict": self.pytorch_state_dict}
2536 ),
2537 **(
2538 {}
2539 if self.tensorflow_js is None
2540 else {"tensorflow_js": self.tensorflow_js}
2541 ),
2542 **(
2543 {}
2544 if self.tensorflow_saved_model_bundle is None
2545 else {
2546 "tensorflow_saved_model_bundle": self.tensorflow_saved_model_bundle
2547 }
2548 ),
2549 **({} if self.torchscript is None else {"torchscript": self.torchscript}),
2550 }
2552 @property
2553 def missing_formats(self):
2554 return {
2555 wf for wf in get_args(WeightsFormat) if wf not in self.available_formats
2556 }
2559class ModelId(ResourceId):
2560 pass
2563class LinkedModel(LinkedResourceBase):
2564 """Reference to a bioimage.io model."""
2566 id: ModelId
2567 """A valid model `id` from the bioimage.io collection."""
2570class _DataDepSize(NamedTuple):
2571 min: StrictInt
2572 max: Optional[StrictInt]
2575class _AxisSizes(NamedTuple):
2576 """the lenghts of all axes of model inputs and outputs"""
2578 inputs: Dict[Tuple[TensorId, AxisId], int]
2579 outputs: Dict[Tuple[TensorId, AxisId], Union[int, _DataDepSize]]
2582class _TensorSizes(NamedTuple):
2583 """_AxisSizes as nested dicts"""
2585 inputs: Dict[TensorId, Dict[AxisId, int]]
2586 outputs: Dict[TensorId, Dict[AxisId, Union[int, _DataDepSize]]]
2589class ReproducibilityTolerance(Node, extra="allow"):
2590 """Describes what small numerical differences -- if any -- may be tolerated
2591 in the generated output when executing in different environments.
2593 A tensor element *output* is considered mismatched to the **test_tensor** if
2594 abs(*output* - **test_tensor**) > **absolute_tolerance** + **relative_tolerance** * abs(**test_tensor**).
2595 (Internally we call [numpy.testing.assert_allclose](https://numpy.org/doc/stable/reference/generated/numpy.testing.assert_allclose.html).)
2597 Motivation:
2598 For testing we can request the respective deep learning frameworks to be as
2599 reproducible as possible by setting seeds and chosing deterministic algorithms,
2600 but differences in operating systems, available hardware and installed drivers
2601 may still lead to numerical differences.
2602 """
2604 relative_tolerance: RelativeTolerance = 1e-3
2605 """Maximum relative tolerance of reproduced test tensor."""
2607 absolute_tolerance: AbsoluteTolerance = 1e-4
2608 """Maximum absolute tolerance of reproduced test tensor."""
2610 mismatched_elements_per_million: MismatchedElementsPerMillion = 100
2611 """Maximum number of mismatched elements/pixels per million to tolerate."""
2613 output_ids: Sequence[TensorId] = ()
2614 """Limits the output tensor IDs these reproducibility details apply to."""
2616 weights_formats: Sequence[WeightsFormat] = ()
2617 """Limits the weights formats these details apply to."""
2620class BioimageioConfig(Node, extra="allow"):
2621 reproducibility_tolerance: Sequence[ReproducibilityTolerance] = ()
2622 """Tolerances to allow when reproducing the model's test outputs
2623 from the model's test inputs.
2624 Only the first entry matching tensor id and weights format is considered.
2625 """
2628class Config(Node, extra="allow"):
2629 bioimageio: BioimageioConfig = Field(
2630 default_factory=BioimageioConfig.model_construct
2631 )
2634class ModelDescr(GenericModelDescrBase):
2635 """Specification of the fields used in a bioimage.io-compliant RDF to describe AI models with pretrained weights.
2636 These fields are typically stored in a YAML file which we call a model resource description file (model RDF).
2637 """
2639 implemented_format_version: ClassVar[Literal["0.5.6"]] = "0.5.6"
2640 if TYPE_CHECKING:
2641 format_version: Literal["0.5.6"] = "0.5.6"
2642 else:
2643 format_version: Literal["0.5.6"]
2644 """Version of the bioimage.io model description specification used.
2645 When creating a new model always use the latest micro/patch version described here.
2646 The `format_version` is important for any consumer software to understand how to parse the fields.
2647 """
2649 implemented_type: ClassVar[Literal["model"]] = "model"
2650 if TYPE_CHECKING:
2651 type: Literal["model"] = "model"
2652 else:
2653 type: Literal["model"]
2654 """Specialized resource type 'model'"""
2656 id: Optional[ModelId] = None
2657 """bioimage.io-wide unique resource identifier
2658 assigned by bioimage.io; version **un**specific."""
2660 authors: FAIR[List[Author]] = Field(
2661 default_factory=cast(Callable[[], List[Author]], list)
2662 )
2663 """The authors are the creators of the model RDF and the primary points of contact."""
2665 documentation: FAIR[Optional[FileSource_documentation]] = None
2666 """URL or relative path to a markdown file with additional documentation.
2667 The recommended documentation file name is `README.md`. An `.md` suffix is mandatory.
2668 The documentation should include a '#[#] Validation' (sub)section
2669 with details on how to quantitatively validate the model on unseen data."""
2671 @field_validator("documentation", mode="after")
2672 @classmethod
2673 def _validate_documentation(
2674 cls, value: Optional[FileSource_documentation]
2675 ) -> Optional[FileSource_documentation]:
2676 if not get_validation_context().perform_io_checks or value is None:
2677 return value
2679 doc_reader = get_reader(value)
2680 doc_content = doc_reader.read().decode(encoding="utf-8")
2681 if not re.search("#.*[vV]alidation", doc_content):
2682 issue_warning(
2683 "No '# Validation' (sub)section found in {value}.",
2684 value=value,
2685 field="documentation",
2686 )
2688 return value
2690 inputs: NotEmpty[Sequence[InputTensorDescr]]
2691 """Describes the input tensors expected by this model."""
2693 @field_validator("inputs", mode="after")
2694 @classmethod
2695 def _validate_input_axes(
2696 cls, inputs: Sequence[InputTensorDescr]
2697 ) -> Sequence[InputTensorDescr]:
2698 input_size_refs = cls._get_axes_with_independent_size(inputs)
2700 for i, ipt in enumerate(inputs):
2701 valid_independent_refs: Dict[
2702 Tuple[TensorId, AxisId],
2703 Tuple[TensorDescr, AnyAxis, Union[int, ParameterizedSize]],
2704 ] = {
2705 **{
2706 (ipt.id, a.id): (ipt, a, a.size)
2707 for a in ipt.axes
2708 if not isinstance(a, BatchAxis)
2709 and isinstance(a.size, (int, ParameterizedSize))
2710 },
2711 **input_size_refs,
2712 }
2713 for a, ax in enumerate(ipt.axes):
2714 cls._validate_axis(
2715 "inputs",
2716 i=i,
2717 tensor_id=ipt.id,
2718 a=a,
2719 axis=ax,
2720 valid_independent_refs=valid_independent_refs,
2721 )
2722 return inputs
2724 @staticmethod
2725 def _validate_axis(
2726 field_name: str,
2727 i: int,
2728 tensor_id: TensorId,
2729 a: int,
2730 axis: AnyAxis,
2731 valid_independent_refs: Dict[
2732 Tuple[TensorId, AxisId],
2733 Tuple[TensorDescr, AnyAxis, Union[int, ParameterizedSize]],
2734 ],
2735 ):
2736 if isinstance(axis, BatchAxis) or isinstance(
2737 axis.size, (int, ParameterizedSize, DataDependentSize)
2738 ):
2739 return
2740 elif not isinstance(axis.size, SizeReference):
2741 assert_never(axis.size)
2743 # validate axis.size SizeReference
2744 ref = (axis.size.tensor_id, axis.size.axis_id)
2745 if ref not in valid_independent_refs:
2746 raise ValueError(
2747 "Invalid tensor axis reference at"
2748 + f" {field_name}[{i}].axes[{a}].size: {axis.size}."
2749 )
2750 if ref == (tensor_id, axis.id):
2751 raise ValueError(
2752 "Self-referencing not allowed for"
2753 + f" {field_name}[{i}].axes[{a}].size: {axis.size}"
2754 )
2755 if axis.type == "channel":
2756 if valid_independent_refs[ref][1].type != "channel":
2757 raise ValueError(
2758 "A channel axis' size may only reference another fixed size"
2759 + " channel axis."
2760 )
2761 if isinstance(axis.channel_names, str) and "{i}" in axis.channel_names:
2762 ref_size = valid_independent_refs[ref][2]
2763 assert isinstance(ref_size, int), (
2764 "channel axis ref (another channel axis) has to specify fixed"
2765 + " size"
2766 )
2767 generated_channel_names = [
2768 Identifier(axis.channel_names.format(i=i))
2769 for i in range(1, ref_size + 1)
2770 ]
2771 axis.channel_names = generated_channel_names
2773 if (ax_unit := getattr(axis, "unit", None)) != (
2774 ref_unit := getattr(valid_independent_refs[ref][1], "unit", None)
2775 ):
2776 raise ValueError(
2777 "The units of an axis and its reference axis need to match, but"
2778 + f" '{ax_unit}' != '{ref_unit}'."
2779 )
2780 ref_axis = valid_independent_refs[ref][1]
2781 if isinstance(ref_axis, BatchAxis):
2782 raise ValueError(
2783 f"Invalid reference axis '{ref_axis.id}' for {tensor_id}.{axis.id}"
2784 + " (a batch axis is not allowed as reference)."
2785 )
2787 if isinstance(axis, WithHalo):
2788 min_size = axis.size.get_size(axis, ref_axis, n=0)
2789 if (min_size - 2 * axis.halo) < 1:
2790 raise ValueError(
2791 f"axis {axis.id} with minimum size {min_size} is too small for halo"
2792 + f" {axis.halo}."
2793 )
2795 input_halo = axis.halo * axis.scale / ref_axis.scale
2796 if input_halo != int(input_halo) or input_halo % 2 == 1:
2797 raise ValueError(
2798 f"input_halo {input_halo} (output_halo {axis.halo} *"
2799 + f" output_scale {axis.scale} / input_scale {ref_axis.scale})"
2800 + f" {tensor_id}.{axis.id}."
2801 )
2803 @model_validator(mode="after")
2804 def _validate_test_tensors(self) -> Self:
2805 if not get_validation_context().perform_io_checks:
2806 return self
2808 test_output_arrays = [
2809 None if descr.test_tensor is None else load_array(descr.test_tensor)
2810 for descr in self.outputs
2811 ]
2812 test_input_arrays = [
2813 None if descr.test_tensor is None else load_array(descr.test_tensor)
2814 for descr in self.inputs
2815 ]
2817 tensors = {
2818 descr.id: (descr, array)
2819 for descr, array in zip(
2820 chain(self.inputs, self.outputs), test_input_arrays + test_output_arrays
2821 )
2822 }
2823 validate_tensors(tensors, tensor_origin="test_tensor")
2825 output_arrays = {
2826 descr.id: array for descr, array in zip(self.outputs, test_output_arrays)
2827 }
2828 for rep_tol in self.config.bioimageio.reproducibility_tolerance:
2829 if not rep_tol.absolute_tolerance:
2830 continue
2832 if rep_tol.output_ids:
2833 out_arrays = {
2834 oid: a
2835 for oid, a in output_arrays.items()
2836 if oid in rep_tol.output_ids
2837 }
2838 else:
2839 out_arrays = output_arrays
2841 for out_id, array in out_arrays.items():
2842 if array is None:
2843 continue
2845 if rep_tol.absolute_tolerance > (max_test_value := array.max()) * 0.01:
2846 raise ValueError(
2847 "config.bioimageio.reproducibility_tolerance.absolute_tolerance="
2848 + f"{rep_tol.absolute_tolerance} > 0.01*{max_test_value}"
2849 + f" (1% of the maximum value of the test tensor '{out_id}')"
2850 )
2852 return self
2854 @model_validator(mode="after")
2855 def _validate_tensor_references_in_proc_kwargs(self, info: ValidationInfo) -> Self:
2856 ipt_refs = {t.id for t in self.inputs}
2857 out_refs = {t.id for t in self.outputs}
2858 for ipt in self.inputs:
2859 for p in ipt.preprocessing:
2860 ref = p.kwargs.get("reference_tensor")
2861 if ref is None:
2862 continue
2863 if ref not in ipt_refs:
2864 raise ValueError(
2865 f"`reference_tensor` '{ref}' not found. Valid input tensor"
2866 + f" references are: {ipt_refs}."
2867 )
2869 for out in self.outputs:
2870 for p in out.postprocessing:
2871 ref = p.kwargs.get("reference_tensor")
2872 if ref is None:
2873 continue
2875 if ref not in ipt_refs and ref not in out_refs:
2876 raise ValueError(
2877 f"`reference_tensor` '{ref}' not found. Valid tensor references"
2878 + f" are: {ipt_refs | out_refs}."
2879 )
2881 return self
2883 # TODO: use validate funcs in validate_test_tensors
2884 # def validate_inputs(self, input_tensors: Mapping[TensorId, NDArray[Any]]) -> Mapping[TensorId, NDArray[Any]]:
2886 name: Annotated[
2887 str,
2888 RestrictCharacters(string.ascii_letters + string.digits + "_+- ()"),
2889 MinLen(5),
2890 MaxLen(128),
2891 warn(MaxLen(64), "Name longer than 64 characters.", INFO),
2892 ]
2893 """A human-readable name of this model.
2894 It should be no longer than 64 characters
2895 and may only contain letter, number, underscore, minus, parentheses and spaces.
2896 We recommend to chose a name that refers to the model's task and image modality.
2897 """
2899 outputs: NotEmpty[Sequence[OutputTensorDescr]]
2900 """Describes the output tensors."""
2902 @field_validator("outputs", mode="after")
2903 @classmethod
2904 def _validate_tensor_ids(
2905 cls, outputs: Sequence[OutputTensorDescr], info: ValidationInfo
2906 ) -> Sequence[OutputTensorDescr]:
2907 tensor_ids = [
2908 t.id for t in info.data.get("inputs", []) + info.data.get("outputs", [])
2909 ]
2910 duplicate_tensor_ids: List[str] = []
2911 seen: Set[str] = set()
2912 for t in tensor_ids:
2913 if t in seen:
2914 duplicate_tensor_ids.append(t)
2916 seen.add(t)
2918 if duplicate_tensor_ids:
2919 raise ValueError(f"Duplicate tensor ids: {duplicate_tensor_ids}")
2921 return outputs
2923 @staticmethod
2924 def _get_axes_with_parameterized_size(
2925 io: Union[Sequence[InputTensorDescr], Sequence[OutputTensorDescr]],
2926 ):
2927 return {
2928 f"{t.id}.{a.id}": (t, a, a.size)
2929 for t in io
2930 for a in t.axes
2931 if not isinstance(a, BatchAxis) and isinstance(a.size, ParameterizedSize)
2932 }
2934 @staticmethod
2935 def _get_axes_with_independent_size(
2936 io: Union[Sequence[InputTensorDescr], Sequence[OutputTensorDescr]],
2937 ):
2938 return {
2939 (t.id, a.id): (t, a, a.size)
2940 for t in io
2941 for a in t.axes
2942 if not isinstance(a, BatchAxis)
2943 and isinstance(a.size, (int, ParameterizedSize))
2944 }
2946 @field_validator("outputs", mode="after")
2947 @classmethod
2948 def _validate_output_axes(
2949 cls, outputs: List[OutputTensorDescr], info: ValidationInfo
2950 ) -> List[OutputTensorDescr]:
2951 input_size_refs = cls._get_axes_with_independent_size(
2952 info.data.get("inputs", [])
2953 )
2954 output_size_refs = cls._get_axes_with_independent_size(outputs)
2956 for i, out in enumerate(outputs):
2957 valid_independent_refs: Dict[
2958 Tuple[TensorId, AxisId],
2959 Tuple[TensorDescr, AnyAxis, Union[int, ParameterizedSize]],
2960 ] = {
2961 **{
2962 (out.id, a.id): (out, a, a.size)
2963 for a in out.axes
2964 if not isinstance(a, BatchAxis)
2965 and isinstance(a.size, (int, ParameterizedSize))
2966 },
2967 **input_size_refs,
2968 **output_size_refs,
2969 }
2970 for a, ax in enumerate(out.axes):
2971 cls._validate_axis(
2972 "outputs",
2973 i,
2974 out.id,
2975 a,
2976 ax,
2977 valid_independent_refs=valid_independent_refs,
2978 )
2980 return outputs
2982 packaged_by: List[Author] = Field(
2983 default_factory=cast(Callable[[], List[Author]], list)
2984 )
2985 """The persons that have packaged and uploaded this model.
2986 Only required if those persons differ from the `authors`."""
2988 parent: Optional[LinkedModel] = None
2989 """The model from which this model is derived, e.g. by fine-tuning the weights."""
2991 @model_validator(mode="after")
2992 def _validate_parent_is_not_self(self) -> Self:
2993 if self.parent is not None and self.parent.id == self.id:
2994 raise ValueError("A model description may not reference itself as parent.")
2996 return self
2998 run_mode: Annotated[
2999 Optional[RunMode],
3000 warn(None, "Run mode '{value}' has limited support across consumer softwares."),
3001 ] = None
3002 """Custom run mode for this model: for more complex prediction procedures like test time
3003 data augmentation that currently cannot be expressed in the specification.
3004 No standard run modes are defined yet."""
3006 timestamp: Datetime = Field(default_factory=Datetime.now)
3007 """Timestamp in [ISO 8601](#https://en.wikipedia.org/wiki/ISO_8601) format
3008 with a few restrictions listed [here](https://docs.python.org/3/library/datetime.html#datetime.datetime.fromisoformat).
3009 (In Python a datetime object is valid, too)."""
3011 training_data: Annotated[
3012 Union[None, LinkedDataset, DatasetDescr, DatasetDescr02],
3013 Field(union_mode="left_to_right"),
3014 ] = None
3015 """The dataset used to train this model"""
3017 weights: Annotated[WeightsDescr, WrapSerializer(package_weights)]
3018 """The weights for this model.
3019 Weights can be given for different formats, but should otherwise be equivalent.
3020 The available weight formats determine which consumers can use this model."""
3022 config: Config = Field(default_factory=Config.model_construct)
3024 @model_validator(mode="after")
3025 def _add_default_cover(self) -> Self:
3026 if not get_validation_context().perform_io_checks or self.covers:
3027 return self
3029 try:
3030 generated_covers = generate_covers(
3031 [
3032 (t, load_array(t.test_tensor))
3033 for t in self.inputs
3034 if t.test_tensor is not None
3035 ],
3036 [
3037 (t, load_array(t.test_tensor))
3038 for t in self.outputs
3039 if t.test_tensor is not None
3040 ],
3041 )
3042 except Exception as e:
3043 issue_warning(
3044 "Failed to generate cover image(s): {e}",
3045 value=self.covers,
3046 msg_context=dict(e=e),
3047 field="covers",
3048 )
3049 else:
3050 self.covers.extend(generated_covers)
3052 return self
3054 def get_input_test_arrays(self) -> List[NDArray[Any]]:
3055 return self._get_test_arrays(self.inputs)
3057 def get_output_test_arrays(self) -> List[NDArray[Any]]:
3058 return self._get_test_arrays(self.outputs)
3060 @staticmethod
3061 def _get_test_arrays(
3062 io_descr: Union[Sequence[InputTensorDescr], Sequence[OutputTensorDescr]],
3063 ):
3064 ts: List[FileDescr] = []
3065 for d in io_descr:
3066 if d.test_tensor is None:
3067 raise ValueError(
3068 f"Failed to get test arrays: description of '{d.id}' is missing a `test_tensor`."
3069 )
3070 ts.append(d.test_tensor)
3072 data = [load_array(t) for t in ts]
3073 assert all(isinstance(d, np.ndarray) for d in data)
3074 return data
3076 @staticmethod
3077 def get_batch_size(tensor_sizes: Mapping[TensorId, Mapping[AxisId, int]]) -> int:
3078 batch_size = 1
3079 tensor_with_batchsize: Optional[TensorId] = None
3080 for tid in tensor_sizes:
3081 for aid, s in tensor_sizes[tid].items():
3082 if aid != BATCH_AXIS_ID or s == 1 or s == batch_size:
3083 continue
3085 if batch_size != 1:
3086 assert tensor_with_batchsize is not None
3087 raise ValueError(
3088 f"batch size mismatch for tensors '{tensor_with_batchsize}' ({batch_size}) and '{tid}' ({s})"
3089 )
3091 batch_size = s
3092 tensor_with_batchsize = tid
3094 return batch_size
3096 def get_output_tensor_sizes(
3097 self, input_sizes: Mapping[TensorId, Mapping[AxisId, int]]
3098 ) -> Dict[TensorId, Dict[AxisId, Union[int, _DataDepSize]]]:
3099 """Returns the tensor output sizes for given **input_sizes**.
3100 Only if **input_sizes** has a valid input shape, the tensor output size is exact.
3101 Otherwise it might be larger than the actual (valid) output"""
3102 batch_size = self.get_batch_size(input_sizes)
3103 ns = self.get_ns(input_sizes)
3105 tensor_sizes = self.get_tensor_sizes(ns, batch_size=batch_size)
3106 return tensor_sizes.outputs
3108 def get_ns(self, input_sizes: Mapping[TensorId, Mapping[AxisId, int]]):
3109 """get parameter `n` for each parameterized axis
3110 such that the valid input size is >= the given input size"""
3111 ret: Dict[Tuple[TensorId, AxisId], ParameterizedSize_N] = {}
3112 axes = {t.id: {a.id: a for a in t.axes} for t in self.inputs}
3113 for tid in input_sizes:
3114 for aid, s in input_sizes[tid].items():
3115 size_descr = axes[tid][aid].size
3116 if isinstance(size_descr, ParameterizedSize):
3117 ret[(tid, aid)] = size_descr.get_n(s)
3118 elif size_descr is None or isinstance(size_descr, (int, SizeReference)):
3119 pass
3120 else:
3121 assert_never(size_descr)
3123 return ret
3125 def get_tensor_sizes(
3126 self, ns: Mapping[Tuple[TensorId, AxisId], ParameterizedSize_N], batch_size: int
3127 ) -> _TensorSizes:
3128 axis_sizes = self.get_axis_sizes(ns, batch_size=batch_size)
3129 return _TensorSizes(
3130 {
3131 t: {
3132 aa: axis_sizes.inputs[(tt, aa)]
3133 for tt, aa in axis_sizes.inputs
3134 if tt == t
3135 }
3136 for t in {tt for tt, _ in axis_sizes.inputs}
3137 },
3138 {
3139 t: {
3140 aa: axis_sizes.outputs[(tt, aa)]
3141 for tt, aa in axis_sizes.outputs
3142 if tt == t
3143 }
3144 for t in {tt for tt, _ in axis_sizes.outputs}
3145 },
3146 )
3148 def get_axis_sizes(
3149 self,
3150 ns: Mapping[Tuple[TensorId, AxisId], ParameterizedSize_N],
3151 batch_size: Optional[int] = None,
3152 *,
3153 max_input_shape: Optional[Mapping[Tuple[TensorId, AxisId], int]] = None,
3154 ) -> _AxisSizes:
3155 """Determine input and output block shape for scale factors **ns**
3156 of parameterized input sizes.
3158 Args:
3159 ns: Scale factor `n` for each axis (keyed by (tensor_id, axis_id))
3160 that is parameterized as `size = min + n * step`.
3161 batch_size: The desired size of the batch dimension.
3162 If given **batch_size** overwrites any batch size present in
3163 **max_input_shape**. Default 1.
3164 max_input_shape: Limits the derived block shapes.
3165 Each axis for which the input size, parameterized by `n`, is larger
3166 than **max_input_shape** is set to the minimal value `n_min` for which
3167 this is still true.
3168 Use this for small input samples or large values of **ns**.
3169 Or simply whenever you know the full input shape.
3171 Returns:
3172 Resolved axis sizes for model inputs and outputs.
3173 """
3174 max_input_shape = max_input_shape or {}
3175 if batch_size is None:
3176 for (_t_id, a_id), s in max_input_shape.items():
3177 if a_id == BATCH_AXIS_ID:
3178 batch_size = s
3179 break
3180 else:
3181 batch_size = 1
3183 all_axes = {
3184 t.id: {a.id: a for a in t.axes} for t in chain(self.inputs, self.outputs)
3185 }
3187 inputs: Dict[Tuple[TensorId, AxisId], int] = {}
3188 outputs: Dict[Tuple[TensorId, AxisId], Union[int, _DataDepSize]] = {}
3190 def get_axis_size(a: Union[InputAxis, OutputAxis]):
3191 if isinstance(a, BatchAxis):
3192 if (t_descr.id, a.id) in ns:
3193 logger.warning(
3194 "Ignoring unexpected size increment factor (n) for batch axis"
3195 + " of tensor '{}'.",
3196 t_descr.id,
3197 )
3198 return batch_size
3199 elif isinstance(a.size, int):
3200 if (t_descr.id, a.id) in ns:
3201 logger.warning(
3202 "Ignoring unexpected size increment factor (n) for fixed size"
3203 + " axis '{}' of tensor '{}'.",
3204 a.id,
3205 t_descr.id,
3206 )
3207 return a.size
3208 elif isinstance(a.size, ParameterizedSize):
3209 if (t_descr.id, a.id) not in ns:
3210 raise ValueError(
3211 "Size increment factor (n) missing for parametrized axis"
3212 + f" '{a.id}' of tensor '{t_descr.id}'."
3213 )
3214 n = ns[(t_descr.id, a.id)]
3215 s_max = max_input_shape.get((t_descr.id, a.id))
3216 if s_max is not None:
3217 n = min(n, a.size.get_n(s_max))
3219 return a.size.get_size(n)
3221 elif isinstance(a.size, SizeReference):
3222 if (t_descr.id, a.id) in ns:
3223 logger.warning(
3224 "Ignoring unexpected size increment factor (n) for axis '{}'"
3225 + " of tensor '{}' with size reference.",
3226 a.id,
3227 t_descr.id,
3228 )
3229 assert not isinstance(a, BatchAxis)
3230 ref_axis = all_axes[a.size.tensor_id][a.size.axis_id]
3231 assert not isinstance(ref_axis, BatchAxis)
3232 ref_key = (a.size.tensor_id, a.size.axis_id)
3233 ref_size = inputs.get(ref_key, outputs.get(ref_key))
3234 assert ref_size is not None, ref_key
3235 assert not isinstance(ref_size, _DataDepSize), ref_key
3236 return a.size.get_size(
3237 axis=a,
3238 ref_axis=ref_axis,
3239 ref_size=ref_size,
3240 )
3241 elif isinstance(a.size, DataDependentSize):
3242 if (t_descr.id, a.id) in ns:
3243 logger.warning(
3244 "Ignoring unexpected increment factor (n) for data dependent"
3245 + " size axis '{}' of tensor '{}'.",
3246 a.id,
3247 t_descr.id,
3248 )
3249 return _DataDepSize(a.size.min, a.size.max)
3250 else:
3251 assert_never(a.size)
3253 # first resolve all , but the `SizeReference` input sizes
3254 for t_descr in self.inputs:
3255 for a in t_descr.axes:
3256 if not isinstance(a.size, SizeReference):
3257 s = get_axis_size(a)
3258 assert not isinstance(s, _DataDepSize)
3259 inputs[t_descr.id, a.id] = s
3261 # resolve all other input axis sizes
3262 for t_descr in self.inputs:
3263 for a in t_descr.axes:
3264 if isinstance(a.size, SizeReference):
3265 s = get_axis_size(a)
3266 assert not isinstance(s, _DataDepSize)
3267 inputs[t_descr.id, a.id] = s
3269 # resolve all output axis sizes
3270 for t_descr in self.outputs:
3271 for a in t_descr.axes:
3272 assert not isinstance(a.size, ParameterizedSize)
3273 s = get_axis_size(a)
3274 outputs[t_descr.id, a.id] = s
3276 return _AxisSizes(inputs=inputs, outputs=outputs)
3278 @model_validator(mode="before")
3279 @classmethod
3280 def _convert(cls, data: Dict[str, Any]) -> Dict[str, Any]:
3281 cls.convert_from_old_format_wo_validation(data)
3282 return data
3284 @classmethod
3285 def convert_from_old_format_wo_validation(cls, data: Dict[str, Any]) -> None:
3286 """Convert metadata following an older format version to this classes' format
3287 without validating the result.
3288 """
3289 if (
3290 data.get("type") == "model"
3291 and isinstance(fv := data.get("format_version"), str)
3292 and fv.count(".") == 2
3293 ):
3294 fv_parts = fv.split(".")
3295 if any(not p.isdigit() for p in fv_parts):
3296 return
3298 fv_tuple = tuple(map(int, fv_parts))
3300 assert cls.implemented_format_version_tuple[0:2] == (0, 5)
3301 if fv_tuple[:2] in ((0, 3), (0, 4)):
3302 m04 = _ModelDescr_v0_4.load(data)
3303 if isinstance(m04, InvalidDescr):
3304 try:
3305 updated = _model_conv.convert_as_dict(
3306 m04 # pyright: ignore[reportArgumentType]
3307 )
3308 except Exception as e:
3309 logger.error(
3310 "Failed to convert from invalid model 0.4 description."
3311 + f"\nerror: {e}"
3312 + "\nProceeding with model 0.5 validation without conversion."
3313 )
3314 updated = None
3315 else:
3316 updated = _model_conv.convert_as_dict(m04)
3318 if updated is not None:
3319 data.clear()
3320 data.update(updated)
3322 elif fv_tuple[:2] == (0, 5):
3323 # bump patch version
3324 data["format_version"] = cls.implemented_format_version
3327class _ModelConv(Converter[_ModelDescr_v0_4, ModelDescr]):
3328 def _convert(
3329 self, src: _ModelDescr_v0_4, tgt: "type[ModelDescr] | type[dict[str, Any]]"
3330 ) -> "ModelDescr | dict[str, Any]":
3331 name = "".join(
3332 c if c in string.ascii_letters + string.digits + "_+- ()" else " "
3333 for c in src.name
3334 )
3336 def conv_authors(auths: Optional[Sequence[_Author_v0_4]]):
3337 conv = (
3338 _author_conv.convert if TYPE_CHECKING else _author_conv.convert_as_dict
3339 )
3340 return None if auths is None else [conv(a) for a in auths]
3342 if TYPE_CHECKING:
3343 arch_file_conv = _arch_file_conv.convert
3344 arch_lib_conv = _arch_lib_conv.convert
3345 else:
3346 arch_file_conv = _arch_file_conv.convert_as_dict
3347 arch_lib_conv = _arch_lib_conv.convert_as_dict
3349 input_size_refs = {
3350 ipt.name: {
3351 a: s
3352 for a, s in zip(
3353 ipt.axes,
3354 (
3355 ipt.shape.min
3356 if isinstance(ipt.shape, _ParameterizedInputShape_v0_4)
3357 else ipt.shape
3358 ),
3359 )
3360 }
3361 for ipt in src.inputs
3362 if ipt.shape
3363 }
3364 output_size_refs = {
3365 **{
3366 out.name: {a: s for a, s in zip(out.axes, out.shape)}
3367 for out in src.outputs
3368 if not isinstance(out.shape, _ImplicitOutputShape_v0_4)
3369 },
3370 **input_size_refs,
3371 }
3373 return tgt(
3374 attachments=(
3375 []
3376 if src.attachments is None
3377 else [FileDescr(source=f) for f in src.attachments.files]
3378 ),
3379 authors=[_author_conv.convert_as_dict(a) for a in src.authors], # pyright: ignore[reportArgumentType]
3380 cite=[{"text": c.text, "doi": c.doi, "url": c.url} for c in src.cite], # pyright: ignore[reportArgumentType]
3381 config=src.config, # pyright: ignore[reportArgumentType]
3382 covers=src.covers,
3383 description=src.description,
3384 documentation=src.documentation,
3385 format_version="0.5.6",
3386 git_repo=src.git_repo, # pyright: ignore[reportArgumentType]
3387 icon=src.icon,
3388 id=None if src.id is None else ModelId(src.id),
3389 id_emoji=src.id_emoji,
3390 license=src.license, # type: ignore
3391 links=src.links,
3392 maintainers=[_maintainer_conv.convert_as_dict(m) for m in src.maintainers], # pyright: ignore[reportArgumentType]
3393 name=name,
3394 tags=src.tags,
3395 type=src.type,
3396 uploader=src.uploader,
3397 version=src.version,
3398 inputs=[ # pyright: ignore[reportArgumentType]
3399 _input_tensor_conv.convert_as_dict(ipt, tt, st, input_size_refs)
3400 for ipt, tt, st in zip(
3401 src.inputs,
3402 src.test_inputs,
3403 src.sample_inputs or [None] * len(src.test_inputs),
3404 )
3405 ],
3406 outputs=[ # pyright: ignore[reportArgumentType]
3407 _output_tensor_conv.convert_as_dict(out, tt, st, output_size_refs)
3408 for out, tt, st in zip(
3409 src.outputs,
3410 src.test_outputs,
3411 src.sample_outputs or [None] * len(src.test_outputs),
3412 )
3413 ],
3414 parent=(
3415 None
3416 if src.parent is None
3417 else LinkedModel(
3418 id=ModelId(
3419 str(src.parent.id)
3420 + (
3421 ""
3422 if src.parent.version_number is None
3423 else f"/{src.parent.version_number}"
3424 )
3425 )
3426 )
3427 ),
3428 training_data=(
3429 None
3430 if src.training_data is None
3431 else (
3432 LinkedDataset(
3433 id=DatasetId(
3434 str(src.training_data.id)
3435 + (
3436 ""
3437 if src.training_data.version_number is None
3438 else f"/{src.training_data.version_number}"
3439 )
3440 )
3441 )
3442 if isinstance(src.training_data, LinkedDataset02)
3443 else src.training_data
3444 )
3445 ),
3446 packaged_by=[_author_conv.convert_as_dict(a) for a in src.packaged_by], # pyright: ignore[reportArgumentType]
3447 run_mode=src.run_mode,
3448 timestamp=src.timestamp,
3449 weights=(WeightsDescr if TYPE_CHECKING else dict)(
3450 keras_hdf5=(w := src.weights.keras_hdf5)
3451 and (KerasHdf5WeightsDescr if TYPE_CHECKING else dict)(
3452 authors=conv_authors(w.authors),
3453 source=w.source,
3454 tensorflow_version=w.tensorflow_version or Version("1.15"),
3455 parent=w.parent,
3456 ),
3457 onnx=(w := src.weights.onnx)
3458 and (OnnxWeightsDescr if TYPE_CHECKING else dict)(
3459 source=w.source,
3460 authors=conv_authors(w.authors),
3461 parent=w.parent,
3462 opset_version=w.opset_version or 15,
3463 ),
3464 pytorch_state_dict=(w := src.weights.pytorch_state_dict)
3465 and (PytorchStateDictWeightsDescr if TYPE_CHECKING else dict)(
3466 source=w.source,
3467 authors=conv_authors(w.authors),
3468 parent=w.parent,
3469 architecture=(
3470 arch_file_conv(
3471 w.architecture,
3472 w.architecture_sha256,
3473 w.kwargs,
3474 )
3475 if isinstance(w.architecture, _CallableFromFile_v0_4)
3476 else arch_lib_conv(w.architecture, w.kwargs)
3477 ),
3478 pytorch_version=w.pytorch_version or Version("1.10"),
3479 dependencies=(
3480 None
3481 if w.dependencies is None
3482 else (FileDescr if TYPE_CHECKING else dict)(
3483 source=cast(
3484 FileSource,
3485 str(deps := w.dependencies)[
3486 (
3487 len("conda:")
3488 if str(deps).startswith("conda:")
3489 else 0
3490 ) :
3491 ],
3492 )
3493 )
3494 ),
3495 ),
3496 tensorflow_js=(w := src.weights.tensorflow_js)
3497 and (TensorflowJsWeightsDescr if TYPE_CHECKING else dict)(
3498 source=w.source,
3499 authors=conv_authors(w.authors),
3500 parent=w.parent,
3501 tensorflow_version=w.tensorflow_version or Version("1.15"),
3502 ),
3503 tensorflow_saved_model_bundle=(
3504 w := src.weights.tensorflow_saved_model_bundle
3505 )
3506 and (TensorflowSavedModelBundleWeightsDescr if TYPE_CHECKING else dict)(
3507 authors=conv_authors(w.authors),
3508 parent=w.parent,
3509 source=w.source,
3510 tensorflow_version=w.tensorflow_version or Version("1.15"),
3511 dependencies=(
3512 None
3513 if w.dependencies is None
3514 else (FileDescr if TYPE_CHECKING else dict)(
3515 source=cast(
3516 FileSource,
3517 (
3518 str(w.dependencies)[len("conda:") :]
3519 if str(w.dependencies).startswith("conda:")
3520 else str(w.dependencies)
3521 ),
3522 )
3523 )
3524 ),
3525 ),
3526 torchscript=(w := src.weights.torchscript)
3527 and (TorchscriptWeightsDescr if TYPE_CHECKING else dict)(
3528 source=w.source,
3529 authors=conv_authors(w.authors),
3530 parent=w.parent,
3531 pytorch_version=w.pytorch_version or Version("1.10"),
3532 ),
3533 ),
3534 )
3537_model_conv = _ModelConv(_ModelDescr_v0_4, ModelDescr)
3540# create better cover images for 3d data and non-image outputs
3541def generate_covers(
3542 inputs: Sequence[Tuple[InputTensorDescr, NDArray[Any]]],
3543 outputs: Sequence[Tuple[OutputTensorDescr, NDArray[Any]]],
3544) -> List[Path]:
3545 def squeeze(
3546 data: NDArray[Any], axes: Sequence[AnyAxis]
3547 ) -> Tuple[NDArray[Any], List[AnyAxis]]:
3548 """apply numpy.ndarray.squeeze while keeping track of the axis descriptions remaining"""
3549 if data.ndim != len(axes):
3550 raise ValueError(
3551 f"tensor shape {data.shape} does not match described axes"
3552 + f" {[a.id for a in axes]}"
3553 )
3555 axes = [deepcopy(a) for a, s in zip(axes, data.shape) if s != 1]
3556 return data.squeeze(), axes
3558 def normalize(
3559 data: NDArray[Any], axis: Optional[Tuple[int, ...]], eps: float = 1e-7
3560 ) -> NDArray[np.float32]:
3561 data = data.astype("float32")
3562 data -= data.min(axis=axis, keepdims=True)
3563 data /= data.max(axis=axis, keepdims=True) + eps
3564 return data
3566 def to_2d_image(data: NDArray[Any], axes: Sequence[AnyAxis]):
3567 original_shape = data.shape
3568 original_axes = list(axes)
3569 data, axes = squeeze(data, axes)
3571 # take slice fom any batch or index axis if needed
3572 # and convert the first channel axis and take a slice from any additional channel axes
3573 slices: Tuple[slice, ...] = ()
3574 ndim = data.ndim
3575 ndim_need = 3 if any(isinstance(a, ChannelAxis) for a in axes) else 2
3576 has_c_axis = False
3577 for i, a in enumerate(axes):
3578 s = data.shape[i]
3579 assert s > 1
3580 if (
3581 isinstance(a, (BatchAxis, IndexInputAxis, IndexOutputAxis))
3582 and ndim > ndim_need
3583 ):
3584 data = data[slices + (slice(s // 2 - 1, s // 2),)]
3585 ndim -= 1
3586 elif isinstance(a, ChannelAxis):
3587 if has_c_axis:
3588 # second channel axis
3589 data = data[slices + (slice(0, 1),)]
3590 ndim -= 1
3591 else:
3592 has_c_axis = True
3593 if s == 2:
3594 # visualize two channels with cyan and magenta
3595 data = np.concatenate(
3596 [
3597 data[slices + (slice(1, 2),)],
3598 data[slices + (slice(0, 1),)],
3599 (
3600 data[slices + (slice(0, 1),)]
3601 + data[slices + (slice(1, 2),)]
3602 )
3603 / 2, # TODO: take maximum instead?
3604 ],
3605 axis=i,
3606 )
3607 elif data.shape[i] == 3:
3608 pass # visualize 3 channels as RGB
3609 else:
3610 # visualize first 3 channels as RGB
3611 data = data[slices + (slice(3),)]
3613 assert data.shape[i] == 3
3615 slices += (slice(None),)
3617 data, axes = squeeze(data, axes)
3618 assert len(axes) == ndim
3619 # take slice from z axis if needed
3620 slices = ()
3621 if ndim > ndim_need:
3622 for i, a in enumerate(axes):
3623 s = data.shape[i]
3624 if a.id == AxisId("z"):
3625 data = data[slices + (slice(s // 2 - 1, s // 2),)]
3626 data, axes = squeeze(data, axes)
3627 ndim -= 1
3628 break
3630 slices += (slice(None),)
3632 # take slice from any space or time axis
3633 slices = ()
3635 for i, a in enumerate(axes):
3636 if ndim <= ndim_need:
3637 break
3639 s = data.shape[i]
3640 assert s > 1
3641 if isinstance(
3642 a, (SpaceInputAxis, SpaceOutputAxis, TimeInputAxis, TimeOutputAxis)
3643 ):
3644 data = data[slices + (slice(s // 2 - 1, s // 2),)]
3645 ndim -= 1
3647 slices += (slice(None),)
3649 del slices
3650 data, axes = squeeze(data, axes)
3651 assert len(axes) == ndim
3653 if (has_c_axis and ndim != 3) or (not has_c_axis and ndim != 2):
3654 raise ValueError(
3655 f"Failed to construct cover image from shape {original_shape} with axes {[a.id for a in original_axes]}."
3656 )
3658 if not has_c_axis:
3659 assert ndim == 2
3660 data = np.repeat(data[:, :, None], 3, axis=2)
3661 axes.append(ChannelAxis(channel_names=list(map(Identifier, "RGB"))))
3662 ndim += 1
3664 assert ndim == 3
3666 # transpose axis order such that longest axis comes first...
3667 axis_order: List[int] = list(np.argsort(list(data.shape)))
3668 axis_order.reverse()
3669 # ... and channel axis is last
3670 c = [i for i in range(3) if isinstance(axes[i], ChannelAxis)][0]
3671 axis_order.append(axis_order.pop(c))
3672 axes = [axes[ao] for ao in axis_order]
3673 data = data.transpose(axis_order)
3675 # h, w = data.shape[:2]
3676 # if h / w in (1.0 or 2.0):
3677 # pass
3678 # elif h / w < 2:
3679 # TODO: enforce 2:1 or 1:1 aspect ratio for generated cover images
3681 norm_along = (
3682 tuple(i for i, a in enumerate(axes) if a.type in ("space", "time")) or None
3683 )
3684 # normalize the data and map to 8 bit
3685 data = normalize(data, norm_along)
3686 data = (data * 255).astype("uint8")
3688 return data
3690 def create_diagonal_split_image(im0: NDArray[Any], im1: NDArray[Any]):
3691 assert im0.dtype == im1.dtype == np.uint8
3692 assert im0.shape == im1.shape
3693 assert im0.ndim == 3
3694 N, M, C = im0.shape
3695 assert C == 3
3696 out = np.ones((N, M, C), dtype="uint8")
3697 for c in range(C):
3698 outc = np.tril(im0[..., c])
3699 mask = outc == 0
3700 outc[mask] = np.triu(im1[..., c])[mask]
3701 out[..., c] = outc
3703 return out
3705 if not inputs:
3706 raise ValueError("Missing test input tensor for cover generation.")
3708 if not outputs:
3709 raise ValueError("Missing test output tensor for cover generation.")
3711 ipt_descr, ipt = inputs[0]
3712 out_descr, out = outputs[0]
3714 ipt_img = to_2d_image(ipt, ipt_descr.axes)
3715 out_img = to_2d_image(out, out_descr.axes)
3717 cover_folder = Path(mkdtemp())
3718 if ipt_img.shape == out_img.shape:
3719 covers = [cover_folder / "cover.png"]
3720 imwrite(covers[0], create_diagonal_split_image(ipt_img, out_img))
3721 else:
3722 covers = [cover_folder / "input.png", cover_folder / "output.png"]
3723 imwrite(covers[0], ipt_img)
3724 imwrite(covers[1], out_img)
3726 return covers