Coverage for src / bioimageio / spec / _internal / io.py: 78%

490 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-31 13:09 +0000

1from __future__ import annotations 

2 

3import collections.abc 

4import hashlib 

5import sys 

6import warnings 

7import zipfile 

8from abc import abstractmethod 

9from contextlib import nullcontext 

10from copy import deepcopy 

11from dataclasses import dataclass, field 

12from datetime import date as _date 

13from datetime import datetime as _datetime 

14from functools import partial 

15from io import TextIOWrapper 

16from pathlib import Path, PurePath, PurePosixPath 

17from tempfile import mkdtemp 

18from typing import ( 

19 TYPE_CHECKING, 

20 Any, 

21 Callable, 

22 Dict, 

23 Generic, 

24 Iterable, 

25 List, 

26 Mapping, 

27 Optional, 

28 Sequence, 

29 Set, 

30 Tuple, 

31 Type, 

32 TypedDict, 

33 TypeVar, 

34 Union, 

35 overload, 

36) 

37from urllib.parse import urlparse, urlsplit, urlunsplit 

38from zipfile import ZipFile 

39 

40import httpx 

41import pydantic 

42from genericache import NoopCache 

43from genericache.digest import ContentDigest, UrlDigest 

44from pydantic import ( 

45 AnyUrl, 

46 DirectoryPath, 

47 Field, 

48 GetCoreSchemaHandler, 

49 PrivateAttr, 

50 RootModel, 

51 TypeAdapter, 

52 model_serializer, 

53 model_validator, 

54) 

55from pydantic_core import core_schema 

56from tqdm import tqdm 

57from typing_extensions import ( 

58 Annotated, 

59 LiteralString, 

60 NotRequired, 

61 Self, 

62 TypeGuard, 

63 Unpack, 

64 assert_never, 

65) 

66from typing_extensions import TypeAliasType as _TypeAliasType 

67 

68from ._settings import settings 

69from .io_basics import ( 

70 ALL_BIOIMAGEIO_YAML_NAMES, 

71 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES, 

72 BIOIMAGEIO_YAML, 

73 AbsoluteDirectory, 

74 AbsoluteFilePath, 

75 BytesReader, 

76 FileName, 

77 FilePath, 

78 Sha256, 

79 ZipPath, 

80 get_sha256, 

81) 

82from .node import Node 

83from .progress import Progressbar 

84from .root_url import RootHttpUrl 

85from .type_guards import is_dict, is_list, is_mapping, is_sequence 

86from .url import HttpUrl 

87from .utils import SLOTS 

88from .validation_context import get_validation_context 

89from .version_type import Version 

90 

91AbsolutePathT = TypeVar( 

92 "AbsolutePathT", 

93 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath], 

94) 

95 

96 

97class LightHttpFileDescr(Node): 

98 """http source with sha256 value (minimal validation)""" 

99 

100 source: pydantic.HttpUrl 

101 """file source""" 

102 

103 sha256: Sha256 

104 """SHA256 checksum of the source file""" 

105 

106 def get_reader( 

107 self, 

108 *, 

109 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None, 

110 ) -> BytesReader: 

111 """open the file source (download if needed)""" 

112 return get_reader(self.source, sha256=self.sha256, progressbar=progressbar) 

113 

114 download = get_reader 

115 """alias for get_reader() method""" 

116 

117 

118class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True): 

119 _absolute: AbsolutePathT = PrivateAttr() 

120 

121 @property 

122 def path(self) -> PurePath: 

123 return self.root 

124 

125 def absolute( # method not property analog to `pathlib.Path.absolute()` 

126 self, 

127 ) -> AbsolutePathT: 

128 """get the absolute path/url 

129 

130 (resolved at time of initialization with the root of the ValidationContext) 

131 """ 

132 return self._absolute 

133 

134 def model_post_init(self, __context: Any) -> None: 

135 """set `_absolute` property with validation context at creation time. @private""" 

136 if self.root.is_absolute(): 

137 raise ValueError(f"{self.root} is an absolute path.") 

138 

139 if self.root.parts and self.root.parts[0] in ("http:", "https:"): 

140 raise ValueError(f"{self.root} looks like an http url.") 

141 

142 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue] 

143 self.get_absolute(get_validation_context().root) 

144 ) 

145 super().model_post_init(__context) 

146 

147 def __str__(self) -> str: 

148 return self.root.as_posix() 

149 

150 def __repr__(self) -> str: 

151 return f"RelativePath('{self}')" 

152 

153 @model_serializer() 

154 def format(self) -> str: 

155 return str(self) 

156 

157 @abstractmethod 

158 def get_absolute( 

159 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile] 

160 ) -> AbsolutePathT: ... 

161 

162 def _get_absolute_impl( 

163 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile] 

164 ) -> Union[Path, HttpUrl, ZipPath]: 

165 if isinstance(root, Path): 

166 return (root / self.root).absolute() 

167 

168 rel_path = self.root.as_posix().strip("/") 

169 if isinstance(root, ZipFile): 

170 return ZipPath(root, rel_path) 

171 

172 parsed = urlsplit(str(root)) 

173 path = list(parsed.path.strip("/").split("/")) 

174 if ( 

175 parsed.netloc == "zenodo.org" 

176 and parsed.path.startswith("/api/records/") 

177 and parsed.path.endswith("/content") 

178 ): 

179 path.insert(-1, rel_path) 

180 else: 

181 path.append(rel_path) 

182 

183 return HttpUrl( 

184 urlunsplit( 

185 ( 

186 parsed.scheme, 

187 parsed.netloc, 

188 "/".join(path), 

189 parsed.query, 

190 parsed.fragment, 

191 ) 

192 ) 

193 ) 

194 

195 @classmethod 

196 def _validate(cls, value: Union[PurePath, str]): 

197 if isinstance(value, str) and ( 

198 value.startswith("https://") or value.startswith("http://") 

199 ): 

200 raise ValueError(f"{value} looks like a URL, not a relative path") 

201 

202 return cls(PurePath(value)) 

203 

204 

205class RelativeFilePath( 

206 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True 

207): 

208 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL).""" 

209 

210 def model_post_init(self, __context: Any) -> None: 

211 """add validation @private""" 

212 if not self.root.parts: # an empty path can only be a directory 

213 raise ValueError(f"{self.root} is not a valid file path.") 

214 

215 super().model_post_init(__context) 

216 

217 def get_absolute( 

218 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile" 

219 ) -> "AbsoluteFilePath | HttpUrl | ZipPath": 

220 absolute = self._get_absolute_impl(root) 

221 if ( 

222 isinstance(absolute, Path) 

223 and (context := get_validation_context()).perform_io_checks 

224 and str(self.root) not in context.known_files 

225 and not absolute.is_file() 

226 ): 

227 raise ValueError(f"{absolute} does not point to an existing file") 

228 

229 return absolute 

230 

231 @property 

232 def suffix(self): 

233 return self.root.suffix 

234 

235 

236class RelativeDirectory( 

237 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True 

238): 

239 def get_absolute( 

240 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile" 

241 ) -> "AbsoluteDirectory | HttpUrl | ZipPath": 

242 absolute = self._get_absolute_impl(root) 

243 if ( 

244 isinstance(absolute, Path) 

245 and get_validation_context().perform_io_checks 

246 and not absolute.is_dir() 

247 ): 

248 raise ValueError(f"{absolute} does not point to an existing directory") 

249 

250 return absolute 

251 

252 

253FileSource = Annotated[ 

254 Union[HttpUrl, RelativeFilePath, FilePath], 

255 Field(union_mode="left_to_right"), 

256] 

257PermissiveFileSource = Union[FileSource, str, pydantic.HttpUrl] 

258 

259 

260class FileDescr(Node): 

261 """A file description""" 

262 

263 source: FileSource 

264 """File source""" 

265 

266 sha256: Optional[Sha256] = None 

267 """SHA256 hash value of the **source** file.""" 

268 

269 @model_validator(mode="after") 

270 def _validate_sha256(self) -> Self: 

271 self.validate_sha256() 

272 return self 

273 

274 def validate_sha256(self, force_recompute: bool = False) -> None: 

275 """validate the sha256 hash value of the **source** file""" 

276 context = get_validation_context() 

277 src_str = str(self.source) 

278 if force_recompute: 

279 actual_sha = None 

280 else: 

281 actual_sha = context.known_files.get(src_str) 

282 

283 if actual_sha is None: 

284 if context.perform_io_checks or force_recompute: 

285 reader = get_reader(self.source, sha256=self.sha256) 

286 if force_recompute: 

287 actual_sha = get_sha256(reader) 

288 else: 

289 actual_sha = reader.sha256 

290 

291 context.known_files[src_str] = actual_sha 

292 elif context.known_files and src_str not in context.known_files: 

293 # perform_io_checks is False, but known files were given, 

294 # so we expect all file references to be in there 

295 raise ValueError(f"File {src_str} not found in `known_files`.") 

296 

297 if actual_sha is None or self.sha256 == actual_sha: 

298 return 

299 elif self.sha256 is None or context.update_hashes: 

300 self.sha256 = actual_sha 

301 elif self.sha256 != actual_sha: 

302 raise ValueError( 

303 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got " 

304 + f"{actual_sha}. Update expected `sha256` or point to the matching " 

305 + "file." 

306 ) 

307 

308 def get_reader( 

309 self, 

310 *, 

311 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None, 

312 ): 

313 """open the file source (download if needed)""" 

314 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256) 

315 

316 def download( 

317 self, 

318 *, 

319 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None, 

320 ): 

321 """alias for `.get_reader`""" 

322 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256) 

323 

324 @property 

325 def suffix(self) -> str: 

326 return self.source.suffix 

327 

328 

329path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = ( 

330 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl]) 

331) 

332 

333 

334@dataclass(frozen=True, **SLOTS) 

335class WithSuffix: 

336 suffix: Union[LiteralString, Tuple[LiteralString, ...]] 

337 case_sensitive: bool 

338 

339 def __get_pydantic_core_schema__( 

340 self, source: Type[Any], handler: GetCoreSchemaHandler 

341 ): 

342 if not self.suffix: 

343 raise ValueError("suffix may not be empty") 

344 

345 schema = handler(source) 

346 return core_schema.no_info_after_validator_function( 

347 self.validate, 

348 schema, 

349 ) 

350 

351 def validate( 

352 self, value: Union[FileSource, FileDescr] 

353 ) -> Union[FileSource, FileDescr]: 

354 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive) 

355 

356 

357def wo_special_file_name(src: F) -> F: 

358 if has_valid_bioimageio_yaml_name(src): 

359 raise ValueError( 

360 f"'{src}' not allowed here as its filename is reserved to identify" 

361 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files." 

362 ) 

363 

364 return src 

365 

366 

367def has_valid_bioimageio_yaml_name(src: Union[FileSource, FileDescr]) -> bool: 

368 return is_valid_bioimageio_yaml_name(extract_file_name(src)) 

369 

370 

371def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool: 

372 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES: 

373 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name): 

374 return True 

375 

376 return False 

377 

378 

379def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName: 

380 file_names = sorted(file_names) 

381 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES: 

382 for file_name in file_names: 

383 if file_name == bioimageio_name or file_name.endswith( 

384 "." + bioimageio_name 

385 ): 

386 return file_name 

387 

388 raise ValueError( 

389 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'" 

390 + " or or any of the alterntive file names:" 

391 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of" 

392 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')." 

393 ) 

394 

395 

396def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName: 

397 if isinstance(path, ZipFile): 

398 file_names = path.namelist() 

399 elif path.is_file(): 

400 if not zipfile.is_zipfile(path): 

401 return path.name 

402 

403 with ZipFile(path, "r") as f: 

404 file_names = f.namelist() 

405 else: 

406 file_names = [p.name for p in path.glob("*")] 

407 

408 return identify_bioimageio_yaml_file_name(file_names) 

409 

410 

411def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource: 

412 if not has_valid_bioimageio_yaml_name(src): 

413 raise ValueError( 

414 f"'{src}' does not have a valid filename to identify" 

415 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files." 

416 ) 

417 

418 return src 

419 

420 

421def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName: 

422 if not is_valid_bioimageio_yaml_name(file_name): 

423 raise ValueError( 

424 f"'{file_name}' is not a valid filename to identify" 

425 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files." 

426 ) 

427 

428 return file_name 

429 

430 

431# types as loaded from YAML 1.2 (with ruyaml) 

432YamlLeafValue = Union[ 

433 bool, _date, _datetime, int, float, str, None 

434] # note: order relevant for deserializing 

435YamlKey = Union[ # YAML Arrays are cast to tuples if used as key in mappings 

436 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though) 

437] 

438if TYPE_CHECKING: 

439 YamlValue = Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]] 

440 YamlValueView = Union[ 

441 YamlLeafValue, Sequence["YamlValueView"], Mapping[YamlKey, "YamlValueView"] 

442 ] 

443else: 

444 # for pydantic validation we need to use `TypeAliasType`, 

445 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types 

446 # however this results in a partially unknown type with the current pyright 1.1.388 

447 YamlValue = _TypeAliasType( 

448 "YamlValue", 

449 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]], 

450 ) 

451 YamlValueView = _TypeAliasType( 

452 "YamlValueView", 

453 Union[ 

454 YamlLeafValue, 

455 Sequence["YamlValueView"], 

456 Mapping[YamlKey, "YamlValueView"], 

457 ], 

458 ) 

459 

460 

461BioimageioYamlContent = Dict[str, YamlValue] 

462BioimageioYamlContentView = Mapping[str, YamlValueView] 

463 

464IncompleteDescrLeaf = Union[Node, YamlValue, PermissiveFileSource, Version] 

465"""Leaf value of a partial description""" 

466 

467IncompleteDescrInner = Union[ 

468 IncompleteDescrLeaf, 

469 List["IncompleteDescrInner"], 

470 Dict[YamlKey, "IncompleteDescrInner"], 

471] 

472"""An inner node of an incomplete resource description --- YAML values and description nodes mixed.""" 

473 

474IncompleteDescr = Dict[str, IncompleteDescrInner] 

475"""An incomplete resource description --- YAML values and description nodes mixed.""" 

476 

477 

478IncompleteDescrLeafView = Union[Node, YamlValueView, PermissiveFileSource, Version] 

479"""Non-editable leaf value of an incomplete description""" 

480 

481IncompleteDescrInnerView = Union[ 

482 IncompleteDescrLeafView, 

483 Sequence["IncompleteDescrInnerView"], 

484 Mapping[YamlKey, "IncompleteDescrInnerView"], 

485 # Mapping[str, YamlValueView], # not sure why this is explicit Mapping is needed 

486] 

487"""A inner node of a non-editable incomplete resource description --- YAML value views and Node instances mixed.""" 

488 

489IncompleteDescrView = Mapping[str, IncompleteDescrInnerView] 

490"""A non-editable incomplete resource description --- YAML mappings and Node instances mixed.""" 

491 

492 

493BioimageioYamlSource = Union[ 

494 PermissiveFileSource, ZipFile, BioimageioYamlContent, BioimageioYamlContentView 

495] 

496 

497 

498@overload 

499def deepcopy_yaml_value(value: BioimageioYamlContentView) -> BioimageioYamlContent: ... 

500 

501 

502@overload 

503def deepcopy_yaml_value(value: YamlValueView) -> YamlValue: ... 

504 

505 

506def deepcopy_yaml_value( 

507 value: Union[BioimageioYamlContentView, YamlValueView], 

508) -> Union[BioimageioYamlContent, YamlValue]: 

509 if isinstance(value, collections.abc.Mapping): 

510 return {key: deepcopy_yaml_value(val) for key, val in value.items()} 

511 elif isinstance(value, collections.abc.Sequence): 

512 return [deepcopy_yaml_value(val) for val in value] 

513 else: 

514 return value 

515 

516 

517def deepcopy_incomplete_descr(data: IncompleteDescrView) -> IncompleteDescr: 

518 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()} 

519 

520 

521def _deepcopy_incomplete_descr_impl( 

522 data: IncompleteDescrInnerView, 

523) -> IncompleteDescrInner: 

524 if isinstance(data, Node): 

525 return deepcopy(data) 

526 elif isinstance(data, str): 

527 return data 

528 elif isinstance(data, collections.abc.Mapping): 

529 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()} 

530 elif isinstance(data, collections.abc.Sequence): 

531 return [_deepcopy_incomplete_descr_impl(v) for v in data] 

532 elif isinstance( 

533 data, 

534 ( 

535 bool, 

536 int, 

537 float, 

538 type(None), 

539 _date, 

540 _datetime, 

541 Version, 

542 RelativeFilePath, 

543 PurePath, 

544 HttpUrl, 

545 pydantic.HttpUrl, 

546 ), 

547 ): 

548 return data 

549 else: 

550 assert_never(data) 

551 

552 

553def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]: 

554 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None))) 

555 

556 

557def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]: 

558 return is_list(value) and all(is_yaml_value(item) for item in value) 

559 

560 

561def is_yaml_sequence(value: Any) -> TypeGuard[List[YamlValueView]]: 

562 return is_sequence(value) and all(is_yaml_value(item) for item in value) 

563 

564 

565def is_yaml_dict(value: Any) -> TypeGuard[BioimageioYamlContent]: 

566 return is_dict(value) and all( 

567 isinstance(key, str) and is_yaml_value(val) for key, val in value.items() 

568 ) 

569 

570 

571def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContentView]: 

572 return is_mapping(value) and all( 

573 isinstance(key, str) and is_yaml_value_read_only(val) 

574 for key, val in value.items() 

575 ) 

576 

577 

578def is_yaml_value(value: Any) -> TypeGuard[YamlValue]: 

579 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_dict(value) 

580 

581 

582def is_yaml_value_read_only(value: Any) -> TypeGuard[YamlValueView]: 

583 return ( 

584 is_yaml_leaf_value(value) or is_yaml_sequence(value) or is_yaml_mapping(value) 

585 ) 

586 

587 

588@dataclass(frozen=True, **SLOTS) 

589class OpenedBioimageioYaml: 

590 content: BioimageioYamlContent = field(repr=False) 

591 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile] 

592 original_source_name: Optional[str] 

593 original_file_name: FileName 

594 unparsed_content: str = field(repr=False) 

595 

596 

597@dataclass(frozen=True, **SLOTS) 

598class LocalFile: 

599 path: FilePath 

600 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile] 

601 original_file_name: FileName 

602 

603 

604@dataclass(frozen=True, **SLOTS) 

605class FileInZip: 

606 path: ZipPath 

607 original_root: Union[RootHttpUrl, ZipFile] 

608 original_file_name: FileName 

609 

610 

611class HashKwargs(TypedDict): 

612 sha256: NotRequired[Optional[Sha256]] 

613 

614 

615_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = ( 

616 TypeAdapter(FileSource) 

617) 

618 

619 

620def interprete_file_source(file_source: PermissiveFileSource) -> FileSource: 

621 if isinstance(file_source, Path): 

622 if file_source.is_dir(): 

623 raise FileNotFoundError( 

624 f"{file_source} is a directory, but expected a file." 

625 ) 

626 return file_source 

627 

628 if isinstance(file_source, HttpUrl): 

629 return file_source 

630 

631 if isinstance(file_source, pydantic.AnyUrl): 

632 file_source = str(file_source) 

633 

634 with get_validation_context().replace(perform_io_checks=False): 

635 strict = _file_source_adapter.validate_python(file_source) 

636 if isinstance(strict, Path) and strict.is_dir(): 

637 raise FileNotFoundError(f"{strict} is a directory, but expected a file.") 

638 

639 return strict 

640 

641 

642def extract( 

643 source: Union[FilePath, ZipFile, ZipPath], 

644 folder: Optional[DirectoryPath] = None, 

645 overwrite: bool = False, 

646) -> DirectoryPath: 

647 extract_member = None 

648 if isinstance(source, ZipPath): 

649 extract_member = source.at 

650 source = source.root 

651 

652 if isinstance(source, ZipFile): 

653 zip_context = nullcontext(source) 

654 if folder is None: 

655 if source.filename is None: 

656 folder = Path(mkdtemp()) 

657 else: 

658 zip_path = Path(source.filename) 

659 folder = zip_path.with_suffix(zip_path.suffix + ".unzip") 

660 else: 

661 zip_context = ZipFile(source, "r") 

662 if folder is None: 

663 folder = source.with_suffix(source.suffix + ".unzip") 

664 

665 if overwrite and folder.exists(): 

666 warnings.warn(f"Overwriting existing unzipped archive at {folder}") 

667 

668 with zip_context as f: 

669 if extract_member is not None: 

670 extracted_file_path = folder / extract_member 

671 if extracted_file_path.exists() and not overwrite: 

672 warnings.warn(f"Found unzipped {extracted_file_path}.") 

673 else: 

674 _ = f.extract(extract_member, folder) 

675 

676 return folder 

677 

678 elif overwrite or not folder.exists(): 

679 f.extractall(folder) 

680 return folder 

681 

682 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")} 

683 expected_content = {info.filename for info in f.filelist} 

684 if expected_missing := expected_content - found_content: 

685 parts = folder.name.split("_") 

686 nr, *suffixes = parts[-1].split(".") 

687 if nr.isdecimal(): 

688 nr = str(int(nr) + 1) 

689 else: 

690 nr = f"1.{nr}" 

691 

692 parts[-1] = ".".join([nr, *suffixes]) 

693 out_path_new = folder.with_name("_".join(parts)) 

694 warnings.warn( 

695 f"Unzipped archive at {folder} is missing expected files" 

696 + f" {expected_missing}." 

697 + f" Unzipping to {out_path_new} instead to avoid overwriting." 

698 ) 

699 return extract(f, out_path_new, overwrite=overwrite) 

700 else: 

701 warnings.warn( 

702 f"Found unzipped archive with all expected files at {folder}." 

703 ) 

704 return folder 

705 

706 

707def get_reader( 

708 source: Union[PermissiveFileSource, FileDescr, ZipPath], 

709 /, 

710 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None, 

711 **kwargs: Unpack[HashKwargs], 

712) -> BytesReader: 

713 """Open a file `source` (download if needed)""" 

714 if isinstance(source, FileDescr): 

715 if "sha256" not in kwargs: 

716 kwargs["sha256"] = source.sha256 

717 

718 source = source.source 

719 elif isinstance(source, str): 

720 source = interprete_file_source(source) 

721 

722 if isinstance(source, RelativeFilePath): 

723 source = source.absolute() 

724 elif isinstance(source, pydantic.AnyUrl): 

725 with get_validation_context().replace(perform_io_checks=False): 

726 source = HttpUrl(source) 

727 

728 if isinstance(source, HttpUrl): 

729 return _open_url(source, progressbar=progressbar, **kwargs) 

730 

731 if isinstance(source, ZipPath): 

732 if not source.exists(): 

733 raise FileNotFoundError(source.filename) 

734 

735 f = source.open(mode="rb") 

736 assert not isinstance(f, TextIOWrapper) 

737 root = source.root 

738 elif isinstance(source, Path): 

739 if source.is_dir(): 

740 raise FileNotFoundError(f"{source} is a directory, not a file") 

741 

742 if not source.exists(): 

743 raise FileNotFoundError(source) 

744 

745 f = source.open("rb") 

746 root = source.parent 

747 else: 

748 assert_never(source) 

749 

750 expected_sha = kwargs.get("sha256") 

751 if expected_sha is None: 

752 sha = None 

753 else: 

754 sha = get_sha256(f) 

755 _ = f.seek(0) 

756 if sha != expected_sha: 

757 raise ValueError( 

758 f"SHA256 mismatch for {source}. Expected {expected_sha}, got {sha}." 

759 ) 

760 

761 return BytesReader( 

762 f, 

763 sha256=sha, 

764 suffix=source.suffix, 

765 original_file_name=source.name, 

766 original_root=root, 

767 is_zipfile=None, 

768 ) 

769 

770 

771download = get_reader 

772 

773 

774def _open_url( 

775 source: HttpUrl, 

776 /, 

777 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None], 

778 **kwargs: Unpack[HashKwargs], 

779) -> BytesReader: 

780 cache = ( 

781 NoopCache[RootHttpUrl](url_hasher=UrlDigest.from_str) 

782 if get_validation_context().disable_cache 

783 else settings.disk_cache 

784 ) 

785 sha = kwargs.get("sha256") 

786 force_refetch = True if sha is None else ContentDigest.parse(hexdigest=sha) 

787 source_path = PurePosixPath( 

788 source.path 

789 or sha 

790 or hashlib.sha256(str(source).encode(encoding="utf-8")).hexdigest() 

791 ) 

792 

793 reader = cache.fetch( 

794 source, 

795 fetcher=partial(_fetch_url, progressbar=progressbar), 

796 force_refetch=force_refetch, 

797 ) 

798 return BytesReader( 

799 reader, 

800 suffix=source_path.suffix, 

801 sha256=sha, 

802 original_file_name=source_path.name, 

803 original_root=source.parent, 

804 is_zipfile=None, 

805 ) 

806 

807 

808def _fetch_url( 

809 source: RootHttpUrl, 

810 *, 

811 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None], 

812): 

813 if source.scheme not in ("http", "https"): 

814 raise NotImplementedError(source.scheme) 

815 

816 if progressbar is None: 

817 # chose progressbar option from validation context 

818 progressbar = get_validation_context().progressbar 

819 

820 if progressbar is None: 

821 # default to no progressbar in CI environments 

822 progressbar = not settings.CI 

823 

824 if callable(progressbar): 

825 progressbar = progressbar() 

826 

827 if isinstance(progressbar, bool) and progressbar: 

828 progressbar = tqdm( 

829 ncols=79, 

830 ascii=bool(sys.platform == "win32"), 

831 unit="B", 

832 unit_scale=True, 

833 leave=True, 

834 ) 

835 

836 if progressbar is not False: 

837 progressbar.set_description(f"Downloading {extract_file_name(source)}") 

838 

839 headers: Dict[str, str] = {} 

840 if settings.user_agent is not None: 

841 headers["User-Agent"] = settings.user_agent 

842 elif settings.CI: 

843 headers["User-Agent"] = "ci" 

844 

845 r = httpx.get( 

846 str(source), 

847 follow_redirects=True, 

848 headers=headers, 

849 timeout=settings.http_timeout, 

850 ) 

851 _ = r.raise_for_status() 

852 

853 # set progressbar.total 

854 total = r.headers.get("content-length") 

855 if total is not None and not isinstance(total, int): 

856 try: 

857 total = int(total) 

858 except Exception: 

859 total = None 

860 

861 if progressbar is not False: 

862 if total is None: 

863 progressbar.total = 0 

864 else: 

865 progressbar.total = total 

866 

867 def iter_content(): 

868 for chunk in r.iter_bytes(chunk_size=4096): 

869 yield chunk 

870 if progressbar is not False: 

871 _ = progressbar.update(len(chunk)) 

872 

873 # Make sure the progress bar gets filled even if the actual number 

874 # is chunks is smaller than expected. This happens when streaming 

875 # text files that are compressed by the server when sending (gzip). 

876 # Binary files don't experience this. 

877 # (adapted from pooch.HttpDownloader) 

878 if progressbar is not False: 

879 progressbar.reset() 

880 if total is not None: 

881 _ = progressbar.update(total) 

882 

883 progressbar.close() 

884 

885 return iter_content() 

886 

887 

888def extract_file_name( 

889 src: Union[ 

890 pydantic.HttpUrl, RootHttpUrl, PurePath, RelativeFilePath, ZipPath, FileDescr 

891 ], 

892) -> FileName: 

893 if isinstance(src, FileDescr): 

894 src = src.source 

895 

896 if isinstance(src, ZipPath): 

897 return src.name or src.root.filename or "bioimageio.zip" 

898 elif isinstance(src, RelativeFilePath): 

899 return src.path.name 

900 elif isinstance(src, PurePath): 

901 return src.name 

902 else: 

903 url = urlparse(str(src)) 

904 if ( 

905 url.scheme == "https" 

906 and url.hostname == "zenodo.org" 

907 and url.path.startswith("/api/records/") 

908 and url.path.endswith("/content") 

909 ): 

910 return url.path.split("/")[-2] 

911 else: 

912 return url.path.split("/")[-1] 

913 

914 

915def extract_file_descrs( 

916 data: IncompleteDescrView, 

917) -> List[FileDescr]: 

918 collected: List[FileDescr] = [] 

919 with get_validation_context().replace(perform_io_checks=False, log_warnings=False): 

920 _extract_file_descrs_impl(data, collected) 

921 

922 return collected 

923 

924 

925def _extract_file_descrs_impl( 

926 data: Union[IncompleteDescrView, IncompleteDescrInnerView], 

927 collected: List[FileDescr], 

928) -> None: 

929 if isinstance(data, FileDescr): 

930 collected.append(data) 

931 elif isinstance(data, Node): 

932 for _, v in data: 

933 _extract_file_descrs_impl(v, collected) 

934 elif isinstance(data, collections.abc.Mapping): 

935 if "source" in data and "sha256" in data: 

936 try: 

937 fd = FileDescr.model_validate( 

938 dict(source=data["source"], sha256=data["sha256"]) 

939 ) 

940 except Exception: 

941 warnings.warn( 

942 "Found mapping with 'source' and 'sha256' keys, but could not parse it as a FileDescr. Ignoring `sha256`." 

943 ) 

944 try: 

945 fd = FileDescr.model_validate(dict(source=data["source"])) 

946 except Exception: 

947 warnings.warn( 

948 f"Found mapping with 'source' and `sha256' keys , but could not parse it as a FileDescr, evning when ignoring 'sha256'. Ignoring `source`: {data['source']}." 

949 ) 

950 else: 

951 collected.append(fd) 

952 else: 

953 collected.append(fd) 

954 

955 for v in data.values(): 

956 _extract_file_descrs_impl(v, collected) 

957 elif not isinstance(data, (str, Path, RelativeFilePath)) and isinstance( 

958 data, collections.abc.Sequence 

959 ): 

960 for v in data: 

961 _extract_file_descrs_impl(v, collected) 

962 

963 

964F = TypeVar("F", bound=Union[FileSource, FileDescr]) 

965 

966 

967def validate_suffix( 

968 value: F, suffix: Union[str, Sequence[str]], case_sensitive: bool 

969) -> F: 

970 """check final suffix""" 

971 if isinstance(suffix, str): 

972 suffixes = [suffix] 

973 else: 

974 suffixes = suffix 

975 

976 assert len(suffixes) > 0, "no suffix given" 

977 assert all(suff.startswith(".") for suff in suffixes), ( 

978 "expected suffixes to start with '.'" 

979 ) 

980 o_value = value 

981 if isinstance(value, FileDescr): 

982 strict = value.source 

983 else: 

984 strict = interprete_file_source(value) 

985 

986 if isinstance(strict, (HttpUrl, AnyUrl)): 

987 if strict.path is None or "." not in (path := strict.path): 

988 actual_suffixes = [] 

989 else: 

990 if ( 

991 strict.host == "zenodo.org" 

992 and path.startswith("/api/records/") 

993 and path.endswith("/content") 

994 ): 

995 # Zenodo API URLs have a "/content" suffix that should be ignored 

996 path = path[: -len("/content")] 

997 

998 actual_suffixes = [f".{path.split('.')[-1]}"] 

999 

1000 elif isinstance(strict, PurePath): 

1001 actual_suffixes = strict.suffixes 

1002 elif isinstance(strict, RelativeFilePath): 

1003 actual_suffixes = strict.path.suffixes 

1004 else: 

1005 assert_never(strict) 

1006 

1007 if actual_suffixes: 

1008 actual_suffix = actual_suffixes[-1] 

1009 else: 

1010 actual_suffix = "no suffix" 

1011 

1012 if ( 

1013 case_sensitive 

1014 and actual_suffix not in suffixes 

1015 or not case_sensitive 

1016 and actual_suffix.lower() not in [s.lower() for s in suffixes] 

1017 ): 

1018 if len(suffixes) == 1: 

1019 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}") 

1020 else: 

1021 raise ValueError( 

1022 f"Expected a suffix from {suffixes}, but got {actual_suffix}" 

1023 ) 

1024 

1025 return o_value 

1026 

1027 

1028def populate_cache(sources: Sequence[Union[FileDescr, LightHttpFileDescr]]): 

1029 unique: Set[str] = set() 

1030 for src in sources: 

1031 if src.sha256 is None: 

1032 continue # not caching without known SHA 

1033 

1034 if isinstance(src.source, (HttpUrl, pydantic.AnyUrl)): 

1035 url = str(src.source) 

1036 elif isinstance(src.source, RelativeFilePath): 

1037 if isinstance(absolute := src.source.absolute(), HttpUrl): 

1038 url = str(absolute) 

1039 else: 

1040 continue # not caching local paths 

1041 elif isinstance(src.source, Path): 

1042 continue # not caching local paths 

1043 else: 

1044 assert_never(src.source) 

1045 

1046 if url in unique: 

1047 continue # skip duplicate URLs 

1048 

1049 unique.add(url) 

1050 _ = src.download()