Coverage for bioimageio/spec/_internal/io_basics.py: 95%
101 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-18 12:47 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-18 12:47 +0000
1import hashlib
2import os
3import zipfile
4from contextlib import nullcontext
5from functools import partial
6from pathlib import Path
7from typing import Any, ClassVar, Optional, Protocol, Type, Union, runtime_checkable
8from zipfile import ZipFile
10import pydantic
11import zipp # pyright: ignore[reportMissingTypeStubs]
12from annotated_types import Predicate
13from pydantic import RootModel, StringConstraints
14from typing_extensions import Annotated
16from .root_url import RootHttpUrl
17from .validated_string import ValidatedString
19FileName = str
20FilePath = Annotated[pydantic.FilePath, pydantic.Field(title="FilePath")]
21AbsoluteDirectory = Annotated[
22 pydantic.DirectoryPath,
23 Predicate(Path.is_absolute),
24 pydantic.Field(title="AbsoluteDirectory"),
25]
26AbsoluteFilePath = Annotated[
27 pydantic.FilePath,
28 Predicate(Path.is_absolute),
29 pydantic.Field(title="AbsoluteFilePath"),
30]
32BIOIMAGEIO_YAML = "rdf.yaml"
33ALTERNATIVE_BIOIMAGEIO_YAML_NAMES = ("bioimageio.yaml", "model.yaml")
34ALL_BIOIMAGEIO_YAML_NAMES = (BIOIMAGEIO_YAML,) + ALTERNATIVE_BIOIMAGEIO_YAML_NAMES
36ZipPath = zipp.Path # not zipfile.Path due to https://bugs.python.org/issue40564
39class Sha256(ValidatedString):
40 """A SHA-256 hash value"""
42 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[
43 Annotated[
44 str,
45 StringConstraints(
46 strip_whitespace=True, to_lower=True, min_length=64, max_length=64
47 ),
48 ]
49 ]
52class BytesReaderP(Protocol):
53 def read(self, size: int = -1, /) -> bytes: ...
55 @property
56 def closed(self) -> bool: ...
58 def readable(self) -> bool: ...
60 def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int: ...
62 def seekable(self) -> bool: ...
64 def tell(self) -> int: ...
67@runtime_checkable
68class BytesReaderIntoP(BytesReaderP, Protocol):
69 def readinto(self, b: Union[bytearray, memoryview]) -> int: ...
72Suffix = str
75class BytesReader(BytesReaderP):
76 def __init__(
77 self,
78 /,
79 reader: Union[BytesReaderP, BytesReaderIntoP],
80 *,
81 sha256: Optional[Sha256],
82 suffix: Suffix,
83 original_file_name: FileName,
84 original_root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile],
85 is_zipfile: Optional[bool],
86 ) -> None:
87 self._reader = reader
88 self._sha256 = sha256
89 self._suffix = suffix
90 self._original_file_name = original_file_name
91 self._original_root = original_root
92 self._is_zipfile = is_zipfile
93 super().__init__()
95 @property
96 def is_zipfile(self) -> bool:
97 if self._is_zipfile is None:
98 pos = self.tell()
99 self._is_zipfile = zipfile.is_zipfile(self)
100 _ = self.seek(pos)
102 return self._is_zipfile
104 @property
105 def sha256(self) -> Sha256:
106 if self._sha256 is None:
107 pos = self._reader.tell()
108 _ = self._reader.seek(0)
109 self._sha256 = get_sha256(self._reader)
110 _ = self._reader.seek(pos)
112 return self._sha256
114 @property
115 def suffix(self) -> Suffix:
116 return self._suffix
118 @property
119 def original_file_name(self) -> FileName:
120 return self._original_file_name
122 @property
123 def original_root(self) -> Union[RootHttpUrl, AbsoluteDirectory, ZipFile]:
124 return self._original_root
126 def read(self, size: int = -1, /) -> bytes:
127 return self._reader.read(size)
129 def read_text(self, encoding: str = "utf-8") -> str:
130 return self._reader.read().decode(encoding)
132 def readable(self) -> bool:
133 return True
135 def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int:
136 return self._reader.seek(offset, whence)
138 def seekable(self) -> bool:
139 return True
141 def tell(self) -> int:
142 return self._reader.tell()
144 @property
145 def closed(self) -> bool:
146 return self._reader.closed
149def get_sha256(source: Union[BytesReaderP, BytesReaderIntoP, Path]) -> Sha256:
150 chunksize = 128 * 1024
151 h = hashlib.sha256()
153 if isinstance(source, BytesReaderIntoP):
154 b = bytearray(chunksize)
155 mv = memoryview(b)
156 for n in iter(lambda: source.readinto(mv), 0):
157 h.update(mv[:n])
158 else:
159 if isinstance(source, Path):
160 read_ctxt = source.open(mode="rb")
161 else:
162 read_ctxt = nullcontext(source)
164 with read_ctxt as r:
165 for chunk in iter(partial(r.read, chunksize), b""):
166 h.update(chunk)
168 sha = h.hexdigest()
169 return Sha256(sha)