bioimageio.spec.common
1from pydantic import ValidationError 2 3from ._internal.common_nodes import InvalidDescr 4from ._internal.io import ( 5 BioimageioYamlContent, 6 BioimageioYamlSource, 7 FileDescr, 8 YamlValue, 9) 10from ._internal.io_basics import ( 11 AbsoluteDirectory, 12 AbsoluteFilePath, 13 BytesReader, 14 FileName, 15 Sha256, 16 ZipPath, 17) 18from ._internal.root_url import RootHttpUrl 19from ._internal.types import ( 20 FilePath, 21 FileSource, 22 PermissiveFileSource, 23 RelativeFilePath, 24) 25from ._internal.url import HttpUrl 26 27__all__ = [ 28 "AbsoluteDirectory", 29 "AbsoluteFilePath", 30 "BioimageioYamlContent", 31 "BioimageioYamlSource", 32 "BytesReader", 33 "FileDescr", 34 "FileName", 35 "FilePath", 36 "FileSource", 37 "HttpUrl", 38 "InvalidDescr", 39 "PermissiveFileSource", 40 "RelativeFilePath", 41 "RootHttpUrl", 42 "Sha256", 43 "ValidationError", 44 "YamlValue", 45 "ZipPath", 46]
76class BytesReader(BytesReaderP): 77 def __init__( 78 self, 79 /, 80 reader: Union[BytesReaderP, BytesReaderIntoP], 81 *, 82 sha256: Optional[Sha256], 83 suffix: Suffix, 84 original_file_name: FileName, 85 original_root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile], 86 is_zipfile: Optional[bool], 87 ) -> None: 88 self._reader = reader 89 self._sha256 = sha256 90 self._suffix = suffix 91 self._original_file_name = original_file_name 92 self._original_root = original_root 93 self._is_zipfile = is_zipfile 94 super().__init__() 95 96 @property 97 def is_zipfile(self) -> bool: 98 if self._is_zipfile is None: 99 pos = self.tell() 100 self._is_zipfile = zipfile.is_zipfile(self) 101 _ = self.seek(pos) 102 103 return self._is_zipfile 104 105 @property 106 def sha256(self) -> Sha256: 107 if self._sha256 is None: 108 pos = self._reader.tell() 109 _ = self._reader.seek(0) 110 self._sha256 = get_sha256(self._reader) 111 _ = self._reader.seek(pos) 112 113 return self._sha256 114 115 @property 116 def suffix(self) -> Suffix: 117 return self._suffix 118 119 @property 120 def original_file_name(self) -> FileName: 121 return self._original_file_name 122 123 @property 124 def original_root(self) -> Union[RootHttpUrl, AbsoluteDirectory, ZipFile]: 125 return self._original_root 126 127 def read(self, size: int = -1, /) -> bytes: 128 return self._reader.read(size) 129 130 def read_text(self, encoding: str = "utf-8") -> str: 131 return self._reader.read().decode(encoding) 132 133 def readable(self) -> bool: 134 return True 135 136 def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int: 137 return self._reader.seek(offset, whence) 138 139 def seekable(self) -> bool: 140 return True 141 142 def tell(self) -> int: 143 return self._reader.tell() 144 145 @property 146 def closed(self) -> bool: 147 return self._reader.closed
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
77 def __init__( 78 self, 79 /, 80 reader: Union[BytesReaderP, BytesReaderIntoP], 81 *, 82 sha256: Optional[Sha256], 83 suffix: Suffix, 84 original_file_name: FileName, 85 original_root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile], 86 is_zipfile: Optional[bool], 87 ) -> None: 88 self._reader = reader 89 self._sha256 = sha256 90 self._suffix = suffix 91 self._original_file_name = original_file_name 92 self._original_root = original_root 93 self._is_zipfile = is_zipfile 94 super().__init__()
255class FileDescr(Node): 256 """A file description""" 257 258 source: FileSource 259 """File source""" 260 261 sha256: Optional[Sha256] = None 262 """SHA256 hash value of the **source** file.""" 263 264 @model_validator(mode="after") 265 def _validate_sha256(self) -> Self: 266 if get_validation_context().perform_io_checks: 267 self.validate_sha256() 268 269 return self 270 271 def validate_sha256(self, force_recompute: bool = False) -> None: 272 """validate the sha256 hash value of the **source** file""" 273 context = get_validation_context() 274 src_str = str(self.source) 275 if not force_recompute and src_str in context.known_files: 276 actual_sha = context.known_files[src_str] 277 else: 278 reader = get_reader(self.source, sha256=self.sha256) 279 if force_recompute: 280 actual_sha = get_sha256(reader) 281 else: 282 actual_sha = reader.sha256 283 284 context.known_files[src_str] = actual_sha 285 286 if actual_sha is None: 287 return 288 elif self.sha256 == actual_sha: 289 pass 290 elif self.sha256 is None or context.update_hashes: 291 self.sha256 = actual_sha 292 elif self.sha256 != actual_sha: 293 raise ValueError( 294 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got " 295 + f"{actual_sha}. Update expected `sha256` or point to the matching " 296 + "file." 297 ) 298 299 def get_reader( 300 self, *, progressbar: Union[Progressbar, Callable[[], Progressbar], bool] = True 301 ): 302 """open the file source (download if needed)""" 303 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256) 304 305 download = get_reader 306 """alias for get_reader() method"""
A file description
File source
271 def validate_sha256(self, force_recompute: bool = False) -> None: 272 """validate the sha256 hash value of the **source** file""" 273 context = get_validation_context() 274 src_str = str(self.source) 275 if not force_recompute and src_str in context.known_files: 276 actual_sha = context.known_files[src_str] 277 else: 278 reader = get_reader(self.source, sha256=self.sha256) 279 if force_recompute: 280 actual_sha = get_sha256(reader) 281 else: 282 actual_sha = reader.sha256 283 284 context.known_files[src_str] = actual_sha 285 286 if actual_sha is None: 287 return 288 elif self.sha256 == actual_sha: 289 pass 290 elif self.sha256 is None or context.update_hashes: 291 self.sha256 = actual_sha 292 elif self.sha256 != actual_sha: 293 raise ValueError( 294 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got " 295 + f"{actual_sha}. Update expected `sha256` or point to the matching " 296 + "file." 297 )
validate the sha256 hash value of the source file
299 def get_reader( 300 self, *, progressbar: Union[Progressbar, Callable[[], Progressbar], bool] = True 301 ): 302 """open the file source (download if needed)""" 303 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
open the file source (download if needed)
299 def get_reader( 300 self, *, progressbar: Union[Progressbar, Callable[[], Progressbar], bool] = True 301 ): 302 """open the file source (download if needed)""" 303 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
alias for get_reader() method
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
124class HttpUrl(RootHttpUrl): 125 """A URL with the HTTP or HTTPS scheme.""" 126 127 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl] 128 _exists: Optional[bool] = None 129 130 def _after_validator(self): 131 self = super()._after_validator() 132 context = get_validation_context() 133 if context.perform_io_checks: 134 _ = self.exists() 135 136 return self 137 138 def exists(self): 139 """True if URL is available""" 140 if self._exists is None: 141 ctxt = get_validation_context() 142 try: 143 with ctxt.replace(warning_level=warning_levels.WARNING): 144 self._validated = _validate_url(self._validated) 145 except Exception as e: 146 if ctxt.log_warnings: 147 logger.info(e) 148 149 self._exists = False 150 else: 151 self._exists = True 152 153 return self._exists
A URL with the HTTP or HTTPS scheme.
the pydantic root model to validate the string
138 def exists(self): 139 """True if URL is available""" 140 if self._exists is None: 141 ctxt = get_validation_context() 142 try: 143 with ctxt.replace(warning_level=warning_levels.WARNING): 144 self._validated = _validate_url(self._validated) 145 except Exception as e: 146 if ctxt.log_warnings: 147 logger.info(e) 148 149 self._exists = False 150 else: 151 self._exists = True 152 153 return self._exists
True if URL is available
370class InvalidDescr( 371 ResourceDescrBase, 372 extra="allow", 373 title="An invalid resource description", 374): 375 """A representation of an invalid resource description""" 376 377 implemented_type: ClassVar[Literal["unknown"]] = "unknown" 378 if TYPE_CHECKING: # see NodeWithExplicitlySetFields 379 type: Any = "unknown" 380 else: 381 type: Any 382 383 implemented_format_version: ClassVar[Literal["unknown"]] = "unknown" 384 if TYPE_CHECKING: # see NodeWithExplicitlySetFields 385 format_version: Any = "unknown" 386 else: 387 format_version: Any
A representation of an invalid resource description
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
204class RelativeFilePath( 205 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True 206): 207 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL).""" 208 209 def model_post_init(self, __context: Any) -> None: 210 """add validation @private""" 211 if not self.root.parts: # an empty path can only be a directory 212 raise ValueError(f"{self.root} is not a valid file path.") 213 214 super().model_post_init(__context) 215 216 def get_absolute( 217 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile" 218 ) -> "AbsoluteFilePath | HttpUrl | ZipPath": 219 absolute = self._get_absolute_impl(root) 220 if ( 221 isinstance(absolute, Path) 222 and (context := get_validation_context()).perform_io_checks 223 and str(self.root) not in context.known_files 224 and not absolute.is_file() 225 ): 226 raise ValueError(f"{absolute} does not point to an existing file") 227 228 return absolute
A path relative to the rdf.yaml
file (also if the RDF source is a URL).
216 def get_absolute( 217 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile" 218 ) -> "AbsoluteFilePath | HttpUrl | ZipPath": 219 absolute = self._get_absolute_impl(root) 220 if ( 221 isinstance(absolute, Path) 222 and (context := get_validation_context()).perform_io_checks 223 and str(self.root) not in context.known_files 224 and not absolute.is_file() 225 ): 226 raise ValueError(f"{absolute} does not point to an existing file") 227 228 return absolute
13class RootHttpUrl(ValidatedString): 14 """An untested HTTP URL, possibly a 'URL folder' or an invalid HTTP URL""" 15 16 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl] 17 _validated: pydantic.HttpUrl 18 19 def absolute(self): 20 """analog to `absolute` method of pathlib.""" 21 return self 22 23 @property 24 def scheme(self) -> str: 25 return self._validated.scheme 26 27 @property 28 def host(self) -> Optional[str]: 29 return self._validated.host 30 31 @property 32 def path(self) -> Optional[str]: 33 return self._validated.path 34 35 @property 36 def parent(self) -> RootHttpUrl: 37 parsed = urlsplit(str(self)) 38 path = list(parsed.path.split("/")) 39 if ( 40 parsed.netloc == "zenodo.org" 41 and parsed.path.startswith("/api/records/") 42 and parsed.path.endswith("/content") 43 ): 44 path[-2:-1] = [] 45 else: 46 path = path[:-1] 47 48 return RootHttpUrl( 49 urlunsplit( 50 ( 51 parsed.scheme, 52 parsed.netloc, 53 "/".join(path), 54 parsed.query, 55 parsed.fragment, 56 ) 57 ) 58 ) 59 60 @property 61 def parents(self) -> Iterable[RootHttpUrl]: 62 """iterate over all URL parents (max 100)""" 63 current = self 64 for _ in range(100): 65 current = current.parent 66 yield current
An untested HTTP URL, possibly a 'URL folder' or an invalid HTTP URL
the pydantic root model to validate the string
35 @property 36 def parent(self) -> RootHttpUrl: 37 parsed = urlsplit(str(self)) 38 path = list(parsed.path.split("/")) 39 if ( 40 parsed.netloc == "zenodo.org" 41 and parsed.path.startswith("/api/records/") 42 and parsed.path.endswith("/content") 43 ): 44 path[-2:-1] = [] 45 else: 46 path = path[:-1] 47 48 return RootHttpUrl( 49 urlunsplit( 50 ( 51 parsed.scheme, 52 parsed.netloc, 53 "/".join(path), 54 parsed.query, 55 parsed.fragment, 56 ) 57 ) 58 )
40class Sha256(ValidatedString): 41 """A SHA-256 hash value""" 42 43 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[ 44 Annotated[ 45 str, 46 StringConstraints( 47 strip_whitespace=True, to_lower=True, min_length=64, max_length=64 48 ), 49 ] 50 ]
A SHA-256 hash value