bioimageio.spec.common

 1from pydantic import ValidationError
 2
 3from ._internal.common_nodes import InvalidDescr
 4from ._internal.io import (
 5    BioimageioYamlContent,
 6    BioimageioYamlSource,
 7    FileDescr,
 8    YamlValue,
 9)
10from ._internal.io_basics import (
11    AbsoluteDirectory,
12    AbsoluteFilePath,
13    BytesReader,
14    FileName,
15    Sha256,
16    ZipPath,
17)
18from ._internal.root_url import RootHttpUrl
19from ._internal.types import (
20    FilePath,
21    FileSource,
22    PermissiveFileSource,
23    RelativeFilePath,
24)
25from ._internal.url import HttpUrl
26
27__all__ = [
28    "AbsoluteDirectory",
29    "AbsoluteFilePath",
30    "BioimageioYamlContent",
31    "BioimageioYamlSource",
32    "BytesReader",
33    "FileDescr",
34    "FileName",
35    "FilePath",
36    "FileSource",
37    "HttpUrl",
38    "InvalidDescr",
39    "PermissiveFileSource",
40    "RelativeFilePath",
41    "RootHttpUrl",
42    "Sha256",
43    "ValidationError",
44    "YamlValue",
45    "ZipPath",
46]
AbsoluteDirectory = typing.Annotated[pathlib.Path, PathType(path_type='dir'), Predicate(is_absolute), FieldInfo(annotation=NoneType, required=True, title='AbsoluteDirectory')]
AbsoluteFilePath = typing.Annotated[pathlib.Path, PathType(path_type='file'), Predicate(is_absolute), FieldInfo(annotation=NoneType, required=True, title='AbsoluteFilePath')]
BioimageioYamlContent = typing.Dict[str, YamlValue]
BioimageioYamlSource = typing.Union[typing.Annotated[typing.Union[HttpUrl, RelativeFilePath, typing.Annotated[pathlib.Path, PathType(path_type='file'), FieldInfo(annotation=NoneType, required=True, title='FilePath')]], FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(union_mode='left_to_right')])], str, pydantic.networks.HttpUrl, zipfile.ZipFile, typing.Dict[str, YamlValue], typing.Mapping[str, YamlValueView]]
class BytesReader(bioimageio.spec._internal.io_basics.BytesReaderP):
 76class BytesReader(BytesReaderP):
 77    def __init__(
 78        self,
 79        /,
 80        reader: Union[BytesReaderP, BytesReaderIntoP],
 81        *,
 82        sha256: Optional[Sha256],
 83        suffix: Suffix,
 84        original_file_name: FileName,
 85        original_root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile],
 86        is_zipfile: Optional[bool],
 87    ) -> None:
 88        self._reader = reader
 89        self._sha256 = sha256
 90        self._suffix = suffix
 91        self._original_file_name = original_file_name
 92        self._original_root = original_root
 93        self._is_zipfile = is_zipfile
 94        super().__init__()
 95
 96    @property
 97    def is_zipfile(self) -> bool:
 98        if self._is_zipfile is None:
 99            pos = self.tell()
100            self._is_zipfile = zipfile.is_zipfile(self)
101            _ = self.seek(pos)
102
103        return self._is_zipfile
104
105    @property
106    def sha256(self) -> Sha256:
107        if self._sha256 is None:
108            pos = self._reader.tell()
109            _ = self._reader.seek(0)
110            self._sha256 = get_sha256(self._reader)
111            _ = self._reader.seek(pos)
112
113        return self._sha256
114
115    @property
116    def suffix(self) -> Suffix:
117        return self._suffix
118
119    @property
120    def original_file_name(self) -> FileName:
121        return self._original_file_name
122
123    @property
124    def original_root(self) -> Union[RootHttpUrl, AbsoluteDirectory, ZipFile]:
125        return self._original_root
126
127    def read(self, size: int = -1, /) -> bytes:
128        return self._reader.read(size)
129
130    def read_text(self, encoding: str = "utf-8") -> str:
131        return self._reader.read().decode(encoding)
132
133    def readable(self) -> bool:
134        return True
135
136    def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int:
137        return self._reader.seek(offset, whence)
138
139    def seekable(self) -> bool:
140        return True
141
142    def tell(self) -> int:
143        return self._reader.tell()
144
145    @property
146    def closed(self) -> bool:
147        return self._reader.closed

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
BytesReader( reader: Union[bioimageio.spec._internal.io_basics.BytesReaderP, bioimageio.spec._internal.io_basics.BytesReaderIntoP], *, sha256: Optional[Sha256], suffix: str, original_file_name: str, original_root: Union[RootHttpUrl, Annotated[pathlib.Path, PathType(path_type='dir'), Predicate(is_absolute), FieldInfo(annotation=NoneType, required=True, title='AbsoluteDirectory')], zipfile.ZipFile], is_zipfile: Optional[bool])
77    def __init__(
78        self,
79        /,
80        reader: Union[BytesReaderP, BytesReaderIntoP],
81        *,
82        sha256: Optional[Sha256],
83        suffix: Suffix,
84        original_file_name: FileName,
85        original_root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile],
86        is_zipfile: Optional[bool],
87    ) -> None:
88        self._reader = reader
89        self._sha256 = sha256
90        self._suffix = suffix
91        self._original_file_name = original_file_name
92        self._original_root = original_root
93        self._is_zipfile = is_zipfile
94        super().__init__()
is_zipfile: bool
 96    @property
 97    def is_zipfile(self) -> bool:
 98        if self._is_zipfile is None:
 99            pos = self.tell()
100            self._is_zipfile = zipfile.is_zipfile(self)
101            _ = self.seek(pos)
102
103        return self._is_zipfile
sha256: Sha256
105    @property
106    def sha256(self) -> Sha256:
107        if self._sha256 is None:
108            pos = self._reader.tell()
109            _ = self._reader.seek(0)
110            self._sha256 = get_sha256(self._reader)
111            _ = self._reader.seek(pos)
112
113        return self._sha256
suffix: str
115    @property
116    def suffix(self) -> Suffix:
117        return self._suffix
original_file_name: str
119    @property
120    def original_file_name(self) -> FileName:
121        return self._original_file_name
original_root: Union[RootHttpUrl, Annotated[pathlib.Path, PathType(path_type='dir'), Predicate(is_absolute), FieldInfo(annotation=NoneType, required=True, title='AbsoluteDirectory')], zipfile.ZipFile]
123    @property
124    def original_root(self) -> Union[RootHttpUrl, AbsoluteDirectory, ZipFile]:
125        return self._original_root
def read(self, size: int = -1, /) -> bytes:
127    def read(self, size: int = -1, /) -> bytes:
128        return self._reader.read(size)
def read_text(self, encoding: str = 'utf-8') -> str:
130    def read_text(self, encoding: str = "utf-8") -> str:
131        return self._reader.read().decode(encoding)
def readable(self) -> bool:
133    def readable(self) -> bool:
134        return True
def seek(self, offset: int, whence: int = 0, /) -> int:
136    def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int:
137        return self._reader.seek(offset, whence)
def seekable(self) -> bool:
139    def seekable(self) -> bool:
140        return True
def tell(self) -> int:
142    def tell(self) -> int:
143        return self._reader.tell()
closed: bool
145    @property
146    def closed(self) -> bool:
147        return self._reader.closed
class FileDescr(bioimageio.spec._internal.node.Node):
255class FileDescr(Node):
256    """A file description"""
257
258    source: FileSource
259    """File source"""
260
261    sha256: Optional[Sha256] = None
262    """SHA256 hash value of the **source** file."""
263
264    @model_validator(mode="after")
265    def _validate_sha256(self) -> Self:
266        if get_validation_context().perform_io_checks:
267            self.validate_sha256()
268
269        return self
270
271    def validate_sha256(self, force_recompute: bool = False) -> None:
272        """validate the sha256 hash value of the **source** file"""
273        context = get_validation_context()
274        src_str = str(self.source)
275        if not force_recompute and src_str in context.known_files:
276            actual_sha = context.known_files[src_str]
277        else:
278            reader = get_reader(self.source, sha256=self.sha256)
279            if force_recompute:
280                actual_sha = get_sha256(reader)
281            else:
282                actual_sha = reader.sha256
283
284            context.known_files[src_str] = actual_sha
285
286        if actual_sha is None:
287            return
288        elif self.sha256 == actual_sha:
289            pass
290        elif self.sha256 is None or context.update_hashes:
291            self.sha256 = actual_sha
292        elif self.sha256 != actual_sha:
293            raise ValueError(
294                f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
295                + f"{actual_sha}. Update expected `sha256` or point to the matching "
296                + "file."
297            )
298
299    def get_reader(
300        self, *, progressbar: Union[Progressbar, Callable[[], Progressbar], bool] = True
301    ):
302        """open the file source (download if needed)"""
303        return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
304
305    download = get_reader
306    """alias for get_reader() method"""

A file description

source: Annotated[Union[HttpUrl, RelativeFilePath, Annotated[pathlib.Path, PathType(path_type='file'), FieldInfo(annotation=NoneType, required=True, title='FilePath')]], FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(union_mode='left_to_right')])]

File source

sha256: Optional[Sha256]

SHA256 hash value of the source file.

def validate_sha256(self, force_recompute: bool = False) -> None:
271    def validate_sha256(self, force_recompute: bool = False) -> None:
272        """validate the sha256 hash value of the **source** file"""
273        context = get_validation_context()
274        src_str = str(self.source)
275        if not force_recompute and src_str in context.known_files:
276            actual_sha = context.known_files[src_str]
277        else:
278            reader = get_reader(self.source, sha256=self.sha256)
279            if force_recompute:
280                actual_sha = get_sha256(reader)
281            else:
282                actual_sha = reader.sha256
283
284            context.known_files[src_str] = actual_sha
285
286        if actual_sha is None:
287            return
288        elif self.sha256 == actual_sha:
289            pass
290        elif self.sha256 is None or context.update_hashes:
291            self.sha256 = actual_sha
292        elif self.sha256 != actual_sha:
293            raise ValueError(
294                f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
295                + f"{actual_sha}. Update expected `sha256` or point to the matching "
296                + "file."
297            )

validate the sha256 hash value of the source file

def get_reader( self, *, progressbar: Union[bioimageio.spec._internal.progress.Progressbar, Callable[[], bioimageio.spec._internal.progress.Progressbar], bool] = True):
299    def get_reader(
300        self, *, progressbar: Union[Progressbar, Callable[[], Progressbar], bool] = True
301    ):
302        """open the file source (download if needed)"""
303        return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)

open the file source (download if needed)

def download( self, *, progressbar: Union[bioimageio.spec._internal.progress.Progressbar, Callable[[], bioimageio.spec._internal.progress.Progressbar], bool] = True):
299    def get_reader(
300        self, *, progressbar: Union[Progressbar, Callable[[], Progressbar], bool] = True
301    ):
302        """open the file source (download if needed)"""
303        return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)

alias for get_reader() method

model_config: ClassVar[pydantic.config.ConfigDict] = {'extra': 'forbid', 'frozen': False, 'populate_by_name': True, 'revalidate_instances': 'never', 'validate_assignment': True, 'validate_default': False, 'validate_return': True, 'use_attribute_docstrings': True, 'model_title_generator': <function _node_title_generator>, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

FileName = <class 'str'>
FilePath = typing.Annotated[pathlib.Path, PathType(path_type='file'), FieldInfo(annotation=NoneType, required=True, title='FilePath')]
FileSource = typing.Annotated[typing.Union[HttpUrl, RelativeFilePath, typing.Annotated[pathlib.Path, PathType(path_type='file'), FieldInfo(annotation=NoneType, required=True, title='FilePath')]], FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(union_mode='left_to_right')])]
class HttpUrl(bioimageio.spec.common.RootHttpUrl):
124class HttpUrl(RootHttpUrl):
125    """A URL with the HTTP or HTTPS scheme."""
126
127    root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl]
128    _exists: Optional[bool] = None
129
130    def _after_validator(self):
131        self = super()._after_validator()
132        context = get_validation_context()
133        if context.perform_io_checks:
134            _ = self.exists()
135
136        return self
137
138    def exists(self):
139        """True if URL is available"""
140        if self._exists is None:
141            ctxt = get_validation_context()
142            try:
143                with ctxt.replace(warning_level=warning_levels.WARNING):
144                    self._validated = _validate_url(self._validated)
145            except Exception as e:
146                if ctxt.log_warnings:
147                    logger.info(e)
148
149                self._exists = False
150            else:
151                self._exists = True
152
153        return self._exists

A URL with the HTTP or HTTPS scheme.

root_model: ClassVar[Type[pydantic.root_model.RootModel[Any]]] = <class 'pydantic.root_model.RootModel[HttpUrl]'>

the pydantic root model to validate the string

def exists(self):
138    def exists(self):
139        """True if URL is available"""
140        if self._exists is None:
141            ctxt = get_validation_context()
142            try:
143                with ctxt.replace(warning_level=warning_levels.WARNING):
144                    self._validated = _validate_url(self._validated)
145            except Exception as e:
146                if ctxt.log_warnings:
147                    logger.info(e)
148
149                self._exists = False
150            else:
151                self._exists = True
152
153        return self._exists

True if URL is available

class InvalidDescr(bioimageio.spec._internal.common_nodes.ResourceDescrBase):
370class InvalidDescr(
371    ResourceDescrBase,
372    extra="allow",
373    title="An invalid resource description",
374):
375    """A representation of an invalid resource description"""
376
377    implemented_type: ClassVar[Literal["unknown"]] = "unknown"
378    if TYPE_CHECKING:  # see NodeWithExplicitlySetFields
379        type: Any = "unknown"
380    else:
381        type: Any
382
383    implemented_format_version: ClassVar[Literal["unknown"]] = "unknown"
384    if TYPE_CHECKING:  # see NodeWithExplicitlySetFields
385        format_version: Any = "unknown"
386    else:
387        format_version: Any

A representation of an invalid resource description

implemented_type: ClassVar[Literal['unknown']] = 'unknown'
implemented_format_version: ClassVar[Literal['unknown']] = 'unknown'
implemented_format_version_tuple: ClassVar[Tuple[int, int, int]] = (0, 0, 0)
model_config: ClassVar[pydantic.config.ConfigDict] = {'extra': 'allow', 'frozen': False, 'populate_by_name': True, 'revalidate_instances': 'never', 'validate_assignment': True, 'validate_default': False, 'validate_return': True, 'use_attribute_docstrings': True, 'model_title_generator': <function _node_title_generator>, 'validate_by_alias': True, 'validate_by_name': True, 'title': 'An invalid resource description'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
PermissiveFileSource = typing.Union[typing.Annotated[typing.Union[HttpUrl, RelativeFilePath, typing.Annotated[pathlib.Path, PathType(path_type='file'), FieldInfo(annotation=NoneType, required=True, title='FilePath')]], FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(union_mode='left_to_right')])], str, pydantic.networks.HttpUrl]
class RelativeFilePath(pydantic.root_model.RootModel[PurePath], typing.Generic[~AbsolutePathT]):
204class RelativeFilePath(
205    RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True
206):
207    """A path relative to the `rdf.yaml` file (also if the RDF source is a URL)."""
208
209    def model_post_init(self, __context: Any) -> None:
210        """add validation @private"""
211        if not self.root.parts:  # an empty path can only be a directory
212            raise ValueError(f"{self.root} is not a valid file path.")
213
214        super().model_post_init(__context)
215
216    def get_absolute(
217        self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
218    ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
219        absolute = self._get_absolute_impl(root)
220        if (
221            isinstance(absolute, Path)
222            and (context := get_validation_context()).perform_io_checks
223            and str(self.root) not in context.known_files
224            and not absolute.is_file()
225        ):
226            raise ValueError(f"{absolute} does not point to an existing file")
227
228        return absolute

A path relative to the rdf.yaml file (also if the RDF source is a URL).

def get_absolute( self, root: RootHttpUrl | pathlib.Path | pydantic.networks.AnyUrl | zipfile.ZipFile) -> Union[Annotated[pathlib.Path, PathType(path_type='file'), Predicate(is_absolute), FieldInfo(annotation=NoneType, required=True, title='AbsoluteFilePath')], HttpUrl, zipp.Path]:
216    def get_absolute(
217        self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
218    ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
219        absolute = self._get_absolute_impl(root)
220        if (
221            isinstance(absolute, Path)
222            and (context := get_validation_context()).perform_io_checks
223            and str(self.root) not in context.known_files
224            and not absolute.is_file()
225        ):
226            raise ValueError(f"{absolute} does not point to an existing file")
227
228        return absolute
class RootHttpUrl(bioimageio.spec._internal.validated_string.ValidatedString):
13class RootHttpUrl(ValidatedString):
14    """An untested HTTP URL, possibly a 'URL folder' or an invalid HTTP URL"""
15
16    root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl]
17    _validated: pydantic.HttpUrl
18
19    def absolute(self):
20        """analog to `absolute` method of pathlib."""
21        return self
22
23    @property
24    def scheme(self) -> str:
25        return self._validated.scheme
26
27    @property
28    def host(self) -> Optional[str]:
29        return self._validated.host
30
31    @property
32    def path(self) -> Optional[str]:
33        return self._validated.path
34
35    @property
36    def parent(self) -> RootHttpUrl:
37        parsed = urlsplit(str(self))
38        path = list(parsed.path.split("/"))
39        if (
40            parsed.netloc == "zenodo.org"
41            and parsed.path.startswith("/api/records/")
42            and parsed.path.endswith("/content")
43        ):
44            path[-2:-1] = []
45        else:
46            path = path[:-1]
47
48        return RootHttpUrl(
49            urlunsplit(
50                (
51                    parsed.scheme,
52                    parsed.netloc,
53                    "/".join(path),
54                    parsed.query,
55                    parsed.fragment,
56                )
57            )
58        )
59
60    @property
61    def parents(self) -> Iterable[RootHttpUrl]:
62        """iterate over all URL parents (max 100)"""
63        current = self
64        for _ in range(100):
65            current = current.parent
66            yield current

An untested HTTP URL, possibly a 'URL folder' or an invalid HTTP URL

root_model: ClassVar[Type[pydantic.root_model.RootModel[Any]]] = <class 'pydantic.root_model.RootModel[HttpUrl]'>

the pydantic root model to validate the string

def absolute(self):
19    def absolute(self):
20        """analog to `absolute` method of pathlib."""
21        return self

analog to absolute method of pathlib.

scheme: str
23    @property
24    def scheme(self) -> str:
25        return self._validated.scheme
host: Optional[str]
27    @property
28    def host(self) -> Optional[str]:
29        return self._validated.host
path: Optional[str]
31    @property
32    def path(self) -> Optional[str]:
33        return self._validated.path
parent: RootHttpUrl
35    @property
36    def parent(self) -> RootHttpUrl:
37        parsed = urlsplit(str(self))
38        path = list(parsed.path.split("/"))
39        if (
40            parsed.netloc == "zenodo.org"
41            and parsed.path.startswith("/api/records/")
42            and parsed.path.endswith("/content")
43        ):
44            path[-2:-1] = []
45        else:
46            path = path[:-1]
47
48        return RootHttpUrl(
49            urlunsplit(
50                (
51                    parsed.scheme,
52                    parsed.netloc,
53                    "/".join(path),
54                    parsed.query,
55                    parsed.fragment,
56                )
57            )
58        )
parents: Iterable[RootHttpUrl]
60    @property
61    def parents(self) -> Iterable[RootHttpUrl]:
62        """iterate over all URL parents (max 100)"""
63        current = self
64        for _ in range(100):
65            current = current.parent
66            yield current

iterate over all URL parents (max 100)

class Sha256(bioimageio.spec._internal.validated_string.ValidatedString):
40class Sha256(ValidatedString):
41    """A SHA-256 hash value"""
42
43    root_model: ClassVar[Type[RootModel[Any]]] = RootModel[
44        Annotated[
45            str,
46            StringConstraints(
47                strip_whitespace=True, to_lower=True, min_length=64, max_length=64
48            ),
49        ]
50    ]

A SHA-256 hash value

root_model: ClassVar[Type[pydantic.root_model.RootModel[Any]]] = <class 'pydantic.root_model.RootModel[Annotated[str, StringConstraints]]'>

the pydantic root model to validate the string

class ValidationError(builtins.ValueError):
def from_exception_data(cls, /, title, line_errors, input_type='python', hide_input=False):
def error_count(self, /):
def errors( self, /, *, include_url=True, include_context=True, include_input=True):
def json( self, /, *, indent=None, include_url=True, include_context=True, include_input=True):
title
YamlValue = YamlValue
ZipPath = <class 'zipp.Path'>