Coverage for bioimageio/spec/_internal/io_basics.py: 95%

101 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-18 12:47 +0000

1import hashlib 

2import os 

3import zipfile 

4from contextlib import nullcontext 

5from functools import partial 

6from pathlib import Path 

7from typing import Any, ClassVar, Optional, Protocol, Type, Union, runtime_checkable 

8from zipfile import ZipFile 

9 

10import pydantic 

11import zipp # pyright: ignore[reportMissingTypeStubs] 

12from annotated_types import Predicate 

13from pydantic import RootModel, StringConstraints 

14from typing_extensions import Annotated 

15 

16from .root_url import RootHttpUrl 

17from .validated_string import ValidatedString 

18 

19FileName = str 

20FilePath = Annotated[pydantic.FilePath, pydantic.Field(title="FilePath")] 

21AbsoluteDirectory = Annotated[ 

22 pydantic.DirectoryPath, 

23 Predicate(Path.is_absolute), 

24 pydantic.Field(title="AbsoluteDirectory"), 

25] 

26AbsoluteFilePath = Annotated[ 

27 pydantic.FilePath, 

28 Predicate(Path.is_absolute), 

29 pydantic.Field(title="AbsoluteFilePath"), 

30] 

31 

32BIOIMAGEIO_YAML = "rdf.yaml" 

33ALTERNATIVE_BIOIMAGEIO_YAML_NAMES = ("bioimageio.yaml", "model.yaml") 

34ALL_BIOIMAGEIO_YAML_NAMES = (BIOIMAGEIO_YAML,) + ALTERNATIVE_BIOIMAGEIO_YAML_NAMES 

35 

36ZipPath = zipp.Path # not zipfile.Path due to https://bugs.python.org/issue40564 

37 

38 

39class Sha256(ValidatedString): 

40 """A SHA-256 hash value""" 

41 

42 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[ 

43 Annotated[ 

44 str, 

45 StringConstraints( 

46 strip_whitespace=True, to_lower=True, min_length=64, max_length=64 

47 ), 

48 ] 

49 ] 

50 

51 

52class BytesReaderP(Protocol): 

53 def read(self, size: int = -1, /) -> bytes: ... 

54 

55 @property 

56 def closed(self) -> bool: ... 

57 

58 def readable(self) -> bool: ... 

59 

60 def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int: ... 

61 

62 def seekable(self) -> bool: ... 

63 

64 def tell(self) -> int: ... 

65 

66 

67@runtime_checkable 

68class BytesReaderIntoP(BytesReaderP, Protocol): 

69 def readinto(self, b: Union[bytearray, memoryview]) -> int: ... 

70 

71 

72Suffix = str 

73 

74 

75class BytesReader(BytesReaderP): 

76 def __init__( 

77 self, 

78 /, 

79 reader: Union[BytesReaderP, BytesReaderIntoP], 

80 *, 

81 sha256: Optional[Sha256], 

82 suffix: Suffix, 

83 original_file_name: FileName, 

84 original_root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile], 

85 is_zipfile: Optional[bool], 

86 ) -> None: 

87 self._reader = reader 

88 self._sha256 = sha256 

89 self._suffix = suffix 

90 self._original_file_name = original_file_name 

91 self._original_root = original_root 

92 self._is_zipfile = is_zipfile 

93 super().__init__() 

94 

95 @property 

96 def is_zipfile(self) -> bool: 

97 if self._is_zipfile is None: 

98 pos = self.tell() 

99 self._is_zipfile = zipfile.is_zipfile(self) 

100 _ = self.seek(pos) 

101 

102 return self._is_zipfile 

103 

104 @property 

105 def sha256(self) -> Sha256: 

106 if self._sha256 is None: 

107 pos = self._reader.tell() 

108 _ = self._reader.seek(0) 

109 self._sha256 = get_sha256(self._reader) 

110 _ = self._reader.seek(pos) 

111 

112 return self._sha256 

113 

114 @property 

115 def suffix(self) -> Suffix: 

116 return self._suffix 

117 

118 @property 

119 def original_file_name(self) -> FileName: 

120 return self._original_file_name 

121 

122 @property 

123 def original_root(self) -> Union[RootHttpUrl, AbsoluteDirectory, ZipFile]: 

124 return self._original_root 

125 

126 def read(self, size: int = -1, /) -> bytes: 

127 return self._reader.read(size) 

128 

129 def read_text(self, encoding: str = "utf-8") -> str: 

130 return self._reader.read().decode(encoding) 

131 

132 def readable(self) -> bool: 

133 return True 

134 

135 def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int: 

136 return self._reader.seek(offset, whence) 

137 

138 def seekable(self) -> bool: 

139 return True 

140 

141 def tell(self) -> int: 

142 return self._reader.tell() 

143 

144 @property 

145 def closed(self) -> bool: 

146 return self._reader.closed 

147 

148 

149def get_sha256(source: Union[BytesReaderP, BytesReaderIntoP, Path]) -> Sha256: 

150 chunksize = 128 * 1024 

151 h = hashlib.sha256() 

152 

153 if isinstance(source, BytesReaderIntoP): 

154 b = bytearray(chunksize) 

155 mv = memoryview(b) 

156 for n in iter(lambda: source.readinto(mv), 0): 

157 h.update(mv[:n]) 

158 else: 

159 if isinstance(source, Path): 

160 read_ctxt = source.open(mode="rb") 

161 else: 

162 read_ctxt = nullcontext(source) 

163 

164 with read_ctxt as r: 

165 for chunk in iter(partial(r.read, chunksize), b""): 

166 h.update(chunk) 

167 

168 sha = h.hexdigest() 

169 return Sha256(sha)