Coverage for bioimageio/spec/_internal/io.py: 77%

444 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-02 14:21 +0000

1from __future__ import annotations 

2 

3import hashlib 

4import io 

5import sys 

6import warnings 

7import zipfile 

8from abc import abstractmethod 

9from contextlib import nullcontext 

10from dataclasses import dataclass 

11from datetime import date as _date 

12from datetime import datetime as _datetime 

13from pathlib import Path, PurePath 

14from tempfile import mktemp 

15from typing import ( 

16 TYPE_CHECKING, 

17 Any, 

18 Dict, 

19 Generic, 

20 Iterable, 

21 List, 

22 Optional, 

23 Protocol, 

24 Sequence, 

25 Tuple, 

26 Type, 

27 TypedDict, 

28 TypeVar, 

29 Union, 

30) 

31from urllib.parse import urlparse, urlsplit, urlunsplit 

32from zipfile import ZipFile, is_zipfile 

33 

34import pooch # pyright: ignore [reportMissingTypeStubs] 

35import pydantic 

36import requests 

37from pydantic import ( 

38 AnyUrl, 

39 DirectoryPath, 

40 Field, 

41 GetCoreSchemaHandler, 

42 PlainSerializer, 

43 PrivateAttr, 

44 RootModel, 

45 SerializationInfo, 

46 TypeAdapter, 

47 model_validator, 

48) 

49from pydantic_core import core_schema 

50from tqdm import tqdm 

51from typing_extensions import ( 

52 Annotated, 

53 LiteralString, 

54 NotRequired, 

55 Self, 

56 TypeGuard, 

57 Unpack, 

58 assert_never, 

59) 

60from typing_extensions import TypeAliasType as _TypeAliasType 

61 

62from ._settings import settings 

63from .io_basics import ( 

64 ALL_BIOIMAGEIO_YAML_NAMES, 

65 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES, 

66 BIOIMAGEIO_YAML, 

67 AbsoluteDirectory, 

68 AbsoluteFilePath, 

69 FileName, 

70 FilePath, 

71 Sha256, 

72 ZipPath, 

73) 

74from .node import Node 

75from .packaging_context import packaging_context_var 

76from .root_url import RootHttpUrl 

77from .type_guards import is_mapping, is_sequence 

78from .url import HttpUrl 

79from .validation_context import get_validation_context 

80from .validator_annotations import AfterValidator 

81 

82if sys.version_info < (3, 10): 

83 SLOTS: Dict[str, bool] = {} 

84else: 

85 SLOTS = {"slots": True} 

86 

87 

88AbsolutePathT = TypeVar( 

89 "AbsolutePathT", 

90 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath], 

91) 

92 

93 

94class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True): 

95 _absolute: AbsolutePathT = PrivateAttr() 

96 

97 @property 

98 def path(self) -> PurePath: 

99 return self.root 

100 

101 def absolute( # method not property analog to `pathlib.Path.absolute()` 

102 self, 

103 ) -> AbsolutePathT: 

104 """get the absolute path/url 

105 

106 (resolved at time of initialization with the root of the ValidationContext) 

107 """ 

108 return self._absolute 

109 

110 def model_post_init(self, __context: Any) -> None: 

111 """set `_absolute` property with validation context at creation time. @private""" 

112 if self.root.is_absolute(): 

113 raise ValueError(f"{self.root} is an absolute path.") 

114 

115 if self.root.parts and self.root.parts[0] in ("http:", "https:"): 

116 raise ValueError(f"{self.root} looks like an http url.") 

117 

118 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue] 

119 self.get_absolute(get_validation_context().root) 

120 ) 

121 super().model_post_init(__context) 

122 

123 # @property 

124 # def __members(self): 

125 # return (self.path,) 

126 

127 # def __eq__(self, __value: object) -> bool: 

128 # return type(__value) is type(self) and self.__members == __value.__members 

129 

130 # def __hash__(self) -> int: 

131 # return hash(self.__members) 

132 

133 def __str__(self) -> str: 

134 return self.root.as_posix() 

135 

136 def __repr__(self) -> str: 

137 return f"RelativePath('{self}')" 

138 

139 @abstractmethod 

140 def get_absolute( 

141 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile] 

142 ) -> AbsolutePathT: ... 

143 

144 def _get_absolute_impl( 

145 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile] 

146 ) -> Union[Path, HttpUrl, ZipPath]: 

147 if isinstance(root, Path): 

148 return (root / self.root).absolute() 

149 

150 rel_path = self.root.as_posix().strip("/") 

151 if isinstance(root, ZipFile): 

152 return ZipPath(root, rel_path) 

153 

154 parsed = urlsplit(str(root)) 

155 path = list(parsed.path.strip("/").split("/")) 

156 if ( 

157 parsed.netloc == "zenodo.org" 

158 and parsed.path.startswith("/api/records/") 

159 and parsed.path.endswith("/content") 

160 ): 

161 path.insert(-1, rel_path) 

162 else: 

163 path.append(rel_path) 

164 

165 return HttpUrl( 

166 urlunsplit( 

167 ( 

168 parsed.scheme, 

169 parsed.netloc, 

170 "/".join(path), 

171 parsed.query, 

172 parsed.fragment, 

173 ) 

174 ) 

175 ) 

176 

177 @classmethod 

178 def _validate(cls, value: Union[PurePath, str]): 

179 if isinstance(value, str) and ( 

180 value.startswith("https://") or value.startswith("http://") 

181 ): 

182 raise ValueError(f"{value} looks like a URL, not a relative path") 

183 

184 return cls(PurePath(value)) 

185 

186 

187class RelativeFilePath( 

188 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True 

189): 

190 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL).""" 

191 

192 def model_post_init(self, __context: Any) -> None: 

193 """add validation @private""" 

194 if not self.root.parts: # an empty path can only be a directory 

195 raise ValueError(f"{self.root} is not a valid file path.") 

196 

197 super().model_post_init(__context) 

198 

199 def get_absolute( 

200 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile" 

201 ) -> "AbsoluteFilePath | HttpUrl | ZipPath": 

202 absolute = self._get_absolute_impl(root) 

203 if ( 

204 isinstance(absolute, Path) 

205 and (context := get_validation_context()).perform_io_checks 

206 and str(self.root) not in context.known_files 

207 and not absolute.is_file() 

208 ): 

209 raise ValueError(f"{absolute} does not point to an existing file") 

210 

211 return absolute 

212 

213 

214class RelativeDirectory( 

215 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True 

216): 

217 def get_absolute( 

218 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile" 

219 ) -> "AbsoluteDirectory | HttpUrl | ZipPath": 

220 absolute = self._get_absolute_impl(root) 

221 if ( 

222 isinstance(absolute, Path) 

223 and get_validation_context().perform_io_checks 

224 and not absolute.is_dir() 

225 ): 

226 raise ValueError(f"{absolute} does not point to an existing directory") 

227 

228 return absolute 

229 

230 

231FileSource = Annotated[ 

232 Union[HttpUrl, RelativeFilePath, FilePath], 

233 Field(union_mode="left_to_right"), 

234] 

235PermissiveFileSource = Union[FileSource, str, pydantic.HttpUrl] 

236 

237V_suffix = TypeVar("V_suffix", bound=FileSource) 

238path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = ( 

239 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl]) 

240) 

241 

242 

243def validate_suffix( 

244 value: V_suffix, suffix: Union[str, Sequence[str]], case_sensitive: bool 

245) -> V_suffix: 

246 """check final suffix""" 

247 if isinstance(suffix, str): 

248 suffixes = [suffix] 

249 else: 

250 suffixes = suffix 

251 

252 assert len(suffixes) > 0, "no suffix given" 

253 assert all( 

254 suff.startswith(".") for suff in suffixes 

255 ), "expected suffixes to start with '.'" 

256 o_value = value 

257 strict = interprete_file_source(value) 

258 

259 if isinstance(strict, (HttpUrl, AnyUrl)): 

260 if strict.path is None or "." not in (path := strict.path): 

261 actual_suffix = "" 

262 elif ( 

263 strict.host == "zenodo.org" 

264 and path.startswith("/api/records/") 

265 and path.endswith("/content") 

266 ): 

267 actual_suffix = "." + path[: -len("/content")].split(".")[-1] 

268 else: 

269 actual_suffix = "." + path.split(".")[-1] 

270 

271 elif isinstance(strict, PurePath): 

272 actual_suffix = strict.suffixes[-1] 

273 elif isinstance(strict, RelativeFilePath): 

274 actual_suffix = strict.path.suffixes[-1] 

275 else: 

276 assert_never(strict) 

277 

278 if ( 

279 case_sensitive 

280 and actual_suffix not in suffixes 

281 or not case_sensitive 

282 and actual_suffix.lower() not in [s.lower() for s in suffixes] 

283 ): 

284 if len(suffixes) == 1: 

285 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}") 

286 else: 

287 raise ValueError( 

288 f"Expected a suffix from {suffixes}, but got {actual_suffix}" 

289 ) 

290 

291 return o_value 

292 

293 

294@dataclass(frozen=True, **SLOTS) 

295class WithSuffix: 

296 suffix: Union[LiteralString, Tuple[LiteralString, ...]] 

297 case_sensitive: bool 

298 

299 def __get_pydantic_core_schema__( 

300 self, source: Type[Any], handler: GetCoreSchemaHandler 

301 ): 

302 if not self.suffix: 

303 raise ValueError("suffix may not be empty") 

304 

305 schema = handler(source) 

306 return core_schema.no_info_after_validator_function( 

307 self.validate, 

308 schema, 

309 ) 

310 

311 def validate(self, value: FileSource) -> FileSource: 

312 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive) 

313 

314 

315def wo_special_file_name(src: FileSource) -> FileSource: 

316 if has_valid_bioimageio_yaml_name(src): 

317 raise ValueError( 

318 f"'{src}' not allowed here as its filename is reserved to identify" 

319 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files." 

320 ) 

321 

322 return src 

323 

324 

325def _package(value: FileSource, info: SerializationInfo) -> Union[str, Path, FileName]: 

326 if (packaging_context := packaging_context_var.get()) is None: 

327 # convert to standard python obj 

328 # note: pydantic keeps returning Rootmodels (here `HttpUrl`) as-is, but if 

329 # this function returns one RootModel, paths are "further serialized" by 

330 # returning the 'root' attribute, which is incorrect. 

331 # see https://github.com/pydantic/pydantic/issues/8963 

332 # TODO: follow up on https://github.com/pydantic/pydantic/issues/8963 

333 if isinstance(value, Path): 

334 unpackaged = value 

335 elif isinstance(value, HttpUrl): 

336 unpackaged = value 

337 elif isinstance(value, RelativeFilePath): 

338 unpackaged = Path(value.path) 

339 elif isinstance(value, AnyUrl): 

340 unpackaged = str(value) 

341 else: 

342 assert_never(value) 

343 

344 if info.mode_is_json(): 

345 # convert to json value # TODO: remove and let pydantic do this? 

346 if isinstance(unpackaged, Path): 

347 unpackaged = str(unpackaged) 

348 elif isinstance(unpackaged, str): 

349 pass 

350 else: 

351 assert_never(unpackaged) 

352 else: 

353 warnings.warn( 

354 "dumping with mode='python' is currently not fully supported for " 

355 + "fields that are included when packaging; returned objects are " 

356 + "standard python objects" 

357 ) 

358 

359 return unpackaged # return unpackaged file source 

360 

361 # package the file source: 

362 # add it to the current package's file sources and return its collision free file name 

363 if isinstance(value, RelativeFilePath): 

364 src = value.absolute() 

365 elif isinstance(value, pydantic.AnyUrl): 

366 src = HttpUrl(str(value)) 

367 elif isinstance(value, HttpUrl): 

368 src = value 

369 elif isinstance(value, Path): 

370 src = value.resolve() 

371 else: 

372 assert_never(value) 

373 

374 fname = extract_file_name(src) 

375 if fname == packaging_context.bioimageio_yaml_file_name: 

376 raise ValueError( 

377 f"Reserved file name '{packaging_context.bioimageio_yaml_file_name}' " 

378 + "not allowed for a file to be packaged" 

379 ) 

380 

381 fsrcs = packaging_context.file_sources 

382 assert not any( 

383 fname.endswith(special) for special in ALL_BIOIMAGEIO_YAML_NAMES 

384 ), fname 

385 if fname in fsrcs and fsrcs[fname] != src: 

386 for i in range(2, 20): 

387 fn, *ext = fname.split(".") 

388 alternative_file_name = ".".join([f"{fn}_{i}", *ext]) 

389 if ( 

390 alternative_file_name not in fsrcs 

391 or fsrcs[alternative_file_name] == src 

392 ): 

393 fname = alternative_file_name 

394 break 

395 else: 

396 raise ValueError(f"Too many file name clashes for {fname}") 

397 

398 fsrcs[fname] = src 

399 return fname 

400 

401 

402include_in_package_serializer = PlainSerializer(_package, when_used="unless-none") 

403ImportantFileSource = Annotated[ 

404 FileSource, 

405 AfterValidator(wo_special_file_name), 

406 include_in_package_serializer, 

407] 

408InPackageIfLocalFileSource = Union[ 

409 Annotated[ 

410 Union[FilePath, RelativeFilePath], 

411 AfterValidator(wo_special_file_name), 

412 include_in_package_serializer, 

413 ], 

414 Union[HttpUrl, pydantic.HttpUrl], 

415] 

416 

417 

418def has_valid_bioimageio_yaml_name(src: FileSource) -> bool: 

419 return is_valid_bioimageio_yaml_name(extract_file_name(src)) 

420 

421 

422def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool: 

423 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES: 

424 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name): 

425 return True 

426 

427 return False 

428 

429 

430def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName: 

431 file_names = sorted(file_names) 

432 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES: 

433 for file_name in file_names: 

434 if file_name == bioimageio_name or file_name.endswith( 

435 "." + bioimageio_name 

436 ): 

437 return file_name 

438 

439 raise ValueError( 

440 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'" 

441 + " or or any of the alterntive file names:" 

442 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of" 

443 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')." 

444 ) 

445 

446 

447def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName: 

448 if isinstance(path, ZipFile): 

449 file_names = path.namelist() 

450 elif path.is_file(): 

451 if not is_zipfile(path): 

452 return path.name 

453 

454 with ZipFile(path, "r") as f: 

455 file_names = f.namelist() 

456 else: 

457 file_names = [p.name for p in path.glob("*")] 

458 

459 return identify_bioimageio_yaml_file_name(file_names) 

460 

461 

462def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource: 

463 if not has_valid_bioimageio_yaml_name(src): 

464 raise ValueError( 

465 f"'{src}' does not have a valid filename to identify" 

466 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files." 

467 ) 

468 

469 return src 

470 

471 

472def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName: 

473 if not is_valid_bioimageio_yaml_name(file_name): 

474 raise ValueError( 

475 f"'{file_name}' is not a valid filename to identify" 

476 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files." 

477 ) 

478 

479 return file_name 

480 

481 

482# types as loaded from YAML 1.2 (with ruyaml) 

483YamlLeafValue = Union[ 

484 bool, _date, _datetime, int, float, str, None 

485] # note: order relevant for deserializing 

486YamlKey = Union[ # YAML Arrays are cast to tuples if used as key in mappings 

487 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though) 

488] 

489if TYPE_CHECKING: 

490 YamlValue = Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]] 

491else: 

492 # for pydantic validation we need to use `TypeAliasType`, 

493 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types 

494 # however this results in a partially unknown type with the current pyright 1.1.388 

495 YamlValue = _TypeAliasType( 

496 "YamlValue", 

497 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]], 

498 ) 

499BioimageioYamlContent = Dict[str, YamlValue] 

500BioimageioYamlSource = Union[PermissiveFileSource, ZipFile, BioimageioYamlContent] 

501 

502 

503def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]: 

504 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None))) 

505 

506 

507def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]: 

508 return is_sequence(value) and all(is_yaml_value(item) for item in value) 

509 

510 

511def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContent]: 

512 return is_mapping(value) and all( 

513 isinstance(key, str) and is_yaml_value(val) for key, val in value.items() 

514 ) 

515 

516 

517def is_yaml_value(value: Any) -> TypeGuard[YamlValue]: 

518 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_mapping(value) 

519 

520 

521@dataclass 

522class OpenedBioimageioYaml: 

523 content: BioimageioYamlContent 

524 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile] 

525 original_file_name: FileName 

526 unparsed_content: str 

527 

528 

529@dataclass 

530class LocalFile: 

531 path: FilePath 

532 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile] 

533 original_file_name: FileName 

534 

535 

536@dataclass 

537class FileInZip: 

538 path: ZipPath 

539 original_root: Union[RootHttpUrl, ZipFile] 

540 original_file_name: FileName 

541 

542 

543class HashKwargs(TypedDict): 

544 sha256: NotRequired[Optional[Sha256]] 

545 

546 

547_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = ( 

548 TypeAdapter(FileSource) 

549) 

550 

551 

552def interprete_file_source(file_source: PermissiveFileSource) -> FileSource: 

553 if isinstance(file_source, Path): 

554 if file_source.is_dir(): 

555 raise FileNotFoundError( 

556 f"{file_source} is a directory, but expected a file." 

557 ) 

558 return file_source 

559 

560 if isinstance(file_source, HttpUrl): 

561 return file_source 

562 

563 if isinstance(file_source, pydantic.AnyUrl): 

564 file_source = str(file_source) 

565 

566 with get_validation_context().replace(perform_io_checks=False): 

567 strict = _file_source_adapter.validate_python(file_source) 

568 if isinstance(strict, Path) and strict.is_dir(): 

569 raise FileNotFoundError(f"{strict} is a directory, but expected a file.") 

570 

571 return strict 

572 

573 

574def _get_known_hash(hash_kwargs: HashKwargs): 

575 if "sha256" in hash_kwargs and hash_kwargs["sha256"] is not None: 

576 return f"sha256:{hash_kwargs['sha256']}" 

577 else: 

578 return None 

579 

580 

581def _get_unique_file_name(url: Union[HttpUrl, pydantic.HttpUrl]): 

582 """ 

583 Create a unique file name based on the given URL; 

584 adapted from pooch.utils.unique_file_name 

585 """ 

586 md5 = hashlib.md5(str(url).encode()).hexdigest() 

587 fname = extract_file_name(url) 

588 # Crop the start of the file name to fit 255 characters including the hash 

589 # and the : 

590 fname = fname[-(255 - len(md5) - 1) :] 

591 unique_name = f"{md5}-{fname}" 

592 return unique_name 

593 

594 

595class Progressbar(Protocol): 

596 count: int 

597 total: int 

598 

599 def update(self, i: int): ... 

600 

601 def reset(self): ... 

602 

603 def close(self): ... 

604 

605 

606def extract( 

607 source: Union[FilePath, ZipFile, ZipPath], 

608 folder: Optional[DirectoryPath] = None, 

609 overwrite: bool = False, 

610) -> DirectoryPath: 

611 extract_member = None 

612 if isinstance(source, ZipPath): 

613 extract_member = source.at 

614 source = source.root 

615 

616 if isinstance(source, ZipFile): 

617 zip_context = nullcontext(source) 

618 if folder is None: 

619 if source.filename is None: 

620 folder = Path(mktemp()) 

621 else: 

622 zip_path = Path(source.filename) 

623 folder = zip_path.with_suffix(zip_path.suffix + ".unzip") 

624 else: 

625 zip_context = ZipFile(source, "r") 

626 if folder is None: 

627 folder = source.with_suffix(source.suffix + ".unzip") 

628 

629 if overwrite and folder.exists(): 

630 warnings.warn(f"Overwriting existing unzipped archive at {folder}") 

631 

632 with zip_context as f: 

633 if extract_member is not None: 

634 extracted_file_path = folder / extract_member 

635 if extracted_file_path.exists() and not overwrite: 

636 warnings.warn(f"Found unzipped {extracted_file_path}.") 

637 else: 

638 _ = f.extract(extract_member, folder) 

639 

640 return folder 

641 

642 elif overwrite or not folder.exists(): 

643 f.extractall(folder) 

644 return folder 

645 

646 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")} 

647 expected_content = {info.filename for info in f.filelist} 

648 if expected_missing := expected_content - found_content: 

649 parts = folder.name.split("_") 

650 nr, *suffixes = parts[-1].split(".") 

651 if nr.isdecimal(): 

652 nr = str(int(nr) + 1) 

653 else: 

654 nr = f"1.{nr}" 

655 

656 parts[-1] = ".".join([nr, *suffixes]) 

657 out_path_new = folder.with_name("_".join(parts)) 

658 warnings.warn( 

659 f"Unzipped archive at {folder} is missing expected files" 

660 + f" {expected_missing}." 

661 + f" Unzipping to {out_path_new} instead to avoid overwriting." 

662 ) 

663 return extract(f, out_path_new, overwrite=overwrite) 

664 else: 

665 warnings.warn( 

666 f"Found unzipped archive with all expected files at {folder}." 

667 ) 

668 return folder 

669 

670 

671def resolve( 

672 source: Union[PermissiveFileSource, FileDescr, ZipPath], 

673 /, 

674 progressbar: Union[Progressbar, bool, None] = None, 

675 **kwargs: Unpack[HashKwargs], 

676) -> Union[LocalFile, FileInZip]: 

677 """Resolve file `source` (download if needed)""" 

678 

679 if isinstance(source, str): 

680 source = interprete_file_source(source) 

681 

682 if isinstance(source, RelativeFilePath): 

683 source = source.absolute() 

684 if isinstance(source, ZipPath): 

685 return FileInZip(source, source.root, extract_file_name(source)) 

686 

687 if isinstance(source, pydantic.AnyUrl): 

688 with get_validation_context().replace(perform_io_checks=False): 

689 source = HttpUrl(source) 

690 

691 if isinstance(source, FileDescr): 

692 return source.download() 

693 elif isinstance(source, ZipPath): 

694 zip_root = source.root 

695 assert isinstance(zip_root, ZipFile) 

696 return FileInZip( 

697 source, 

698 zip_root, 

699 extract_file_name(source), 

700 ) 

701 elif isinstance(source, Path): 

702 if source.is_dir(): 

703 raise FileNotFoundError(f"{source} is a directory, not a file") 

704 

705 if not source.exists(): 

706 raise FileNotFoundError(source) 

707 

708 return LocalFile( 

709 source, 

710 source.parent, 

711 extract_file_name(source), 

712 ) 

713 elif isinstance(source, HttpUrl): 

714 if source.scheme not in ("http", "https"): 

715 raise NotImplementedError(source.scheme) 

716 

717 if settings.CI: 

718 headers = {"User-Agent": "ci"} 

719 if progressbar is None: 

720 progressbar = False 

721 else: 

722 headers = {} 

723 if progressbar is None: 

724 progressbar = True 

725 

726 if settings.user_agent is not None: 

727 headers["User-Agent"] = settings.user_agent 

728 

729 chunk_size = 1024 

730 if ( 

731 settings.cache_path 

732 and not get_validation_context().disable_cache 

733 and any(v is not None for v in kwargs.values()) 

734 ): 

735 downloader = pooch.HTTPDownloader( 

736 headers=headers, 

737 progressbar=progressbar, # pyright: ignore[reportArgumentType] 

738 chunk_size=chunk_size, 

739 ) 

740 fname = _get_unique_file_name(source) 

741 _ls: Any = pooch.retrieve( 

742 url=str(source), 

743 known_hash=_get_known_hash(kwargs), 

744 downloader=downloader, 

745 fname=fname, 

746 path=settings.cache_path, 

747 ) 

748 local_source = Path(_ls).absolute() 

749 return LocalFile( 

750 local_source, 

751 source.parent, 

752 extract_file_name(source), 

753 ) 

754 else: 

755 # cacheless download to memory using an in memory zip file 

756 r = requests.get(str(source), stream=True) 

757 r.raise_for_status() 

758 

759 zf = zipfile.ZipFile(io.BytesIO(), "w") 

760 fn = extract_file_name(source) 

761 total = int(r.headers.get("content-length", 0)) 

762 

763 if isinstance(progressbar, bool): 

764 if progressbar: 

765 use_ascii = bool(sys.platform == "win32") 

766 pbar = tqdm( 

767 total=total, 

768 ncols=79, 

769 ascii=use_ascii, 

770 unit="B", 

771 unit_scale=True, 

772 leave=True, 

773 ) 

774 pbar = tqdm(desc=f"Downloading {fn}") 

775 else: 

776 pbar = None 

777 else: 

778 pbar = progressbar 

779 

780 zp = ZipPath(zf, fn) 

781 with zp.open("wb") as z: 

782 assert not isinstance(z, io.TextIOWrapper) 

783 for chunk in r.iter_content(chunk_size=chunk_size): 

784 n = z.write(chunk) 

785 if pbar is not None: 

786 _ = pbar.update(n) 

787 

788 # Make sure the progress bar gets filled even if the actual number 

789 # is chunks is smaller than expected. This happens when streaming 

790 # text files that are compressed by the server when sending (gzip). 

791 # Binary files don't experience this. 

792 # (adapted from pooch.HttpDownloader) 

793 if pbar is not None: 

794 pbar.reset() 

795 _ = pbar.update(total) 

796 pbar.close() 

797 

798 return FileInZip( 

799 path=zp, 

800 original_root=source.parent, 

801 original_file_name=fn, 

802 ) 

803 

804 else: 

805 assert_never(source) 

806 

807 

808download = resolve 

809 

810 

811def resolve_and_extract( 

812 source: Union[PermissiveFileSource, FileDescr, ZipPath], 

813 /, 

814 progressbar: Union[Progressbar, bool, None] = None, 

815 **kwargs: Unpack[HashKwargs], 

816) -> LocalFile: 

817 """Resolve `source` within current ValidationContext, 

818 download if needed and 

819 extract file if within zip archive. 

820 

821 note: If source points to a zip file it is not extracted 

822 """ 

823 local = resolve(source, progressbar=progressbar, **kwargs) 

824 if isinstance(local, LocalFile): 

825 return local 

826 

827 folder = extract(local.path) 

828 

829 return LocalFile( 

830 folder / local.path.at, 

831 original_root=local.original_root, 

832 original_file_name=local.original_file_name, 

833 ) 

834 

835 

836class LightHttpFileDescr(Node): 

837 """http source with sha256 value (minimal validation)""" 

838 

839 source: pydantic.HttpUrl 

840 """file source""" 

841 

842 sha256: Sha256 

843 """SHA256 checksum of the source file""" 

844 

845 def download(self): 

846 return download(self.source, sha256=self.sha256) 

847 

848 

849class FileDescr(Node): 

850 source: ImportantFileSource 

851 """∈📦 file source""" 

852 

853 sha256: Optional[Sha256] = None 

854 """SHA256 checksum of the source file""" 

855 

856 @model_validator(mode="after") 

857 def _validate_sha256(self) -> Self: 

858 if get_validation_context().perform_io_checks: 

859 self.validate_sha256() 

860 

861 return self 

862 

863 def validate_sha256(self): 

864 context = get_validation_context() 

865 if (src_str := str(self.source)) in context.known_files: 

866 actual_sha = context.known_files[src_str] 

867 else: 

868 local_source = download(self.source, sha256=self.sha256).path 

869 actual_sha = get_sha256(local_source) 

870 context.known_files[src_str] = actual_sha 

871 

872 if actual_sha is None: 

873 return 

874 elif self.sha256 == actual_sha: 

875 pass 

876 elif self.sha256 is None or context.update_hashes: 

877 self.sha256 = actual_sha 

878 elif self.sha256 != actual_sha: 

879 raise ValueError( 

880 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got " 

881 + f"{actual_sha}. Update expected `sha256` or point to the matching " 

882 + "file." 

883 ) 

884 

885 def download(self): 

886 

887 return download(self.source, sha256=self.sha256) 

888 

889 

890def extract_file_name( 

891 src: Union[pydantic.HttpUrl, HttpUrl, PurePath, RelativeFilePath, ZipPath], 

892) -> FileName: 

893 if isinstance(src, ZipPath): 

894 return src.name or src.root.filename or "bioimageio.zip" 

895 elif isinstance(src, RelativeFilePath): 

896 return src.path.name 

897 elif isinstance(src, PurePath): 

898 return src.name 

899 else: 

900 url = urlparse(str(src)) 

901 if ( 

902 url.scheme == "https" 

903 and url.hostname == "zenodo.org" 

904 and url.path.startswith("/api/records/") 

905 and url.path.endswith("/content") 

906 ): 

907 return url.path.split("/")[-2] 

908 else: 

909 return url.path.split("/")[-1] 

910 

911 

912def get_sha256(path: Union[Path, ZipPath]) -> Sha256: 

913 """from https://stackoverflow.com/a/44873382""" 

914 if isinstance(path, ZipPath): 

915 # no buffered reading available 

916 zf = path.root 

917 assert isinstance(zf, ZipFile) 

918 data = path.read_bytes() 

919 assert isinstance(data, bytes) 

920 h = hashlib.sha256(data) 

921 else: 

922 h = hashlib.sha256() 

923 chunksize = 128 * 1024 

924 b = bytearray(chunksize) 

925 mv = memoryview(b) 

926 with open(path, "rb", buffering=0) as f: 

927 for n in iter(lambda: f.readinto(mv), 0): 

928 h.update(mv[:n]) 

929 

930 sha = h.hexdigest() 

931 assert len(sha) == 64 

932 return Sha256(sha)