Coverage for src / bioimageio / spec / _internal / io.py: 78%
490 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 08:44 +0000
1from __future__ import annotations
3import collections.abc
4import hashlib
5import sys
6import warnings
7import zipfile
8from abc import abstractmethod
9from contextlib import nullcontext
10from copy import deepcopy
11from dataclasses import dataclass, field
12from datetime import date as _date
13from datetime import datetime as _datetime
14from functools import partial
15from io import TextIOWrapper
16from pathlib import Path, PurePath, PurePosixPath
17from tempfile import mkdtemp
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Callable,
22 Dict,
23 Generic,
24 Iterable,
25 List,
26 Mapping,
27 Optional,
28 Sequence,
29 Set,
30 Tuple,
31 Type,
32 TypedDict,
33 TypeVar,
34 Union,
35 overload,
36)
37from urllib.parse import urlparse, urlsplit, urlunsplit
38from zipfile import ZipFile
40import httpx
41import pydantic
42from genericache import NoopCache
43from genericache.digest import ContentDigest, UrlDigest
44from pydantic import (
45 AnyUrl,
46 DirectoryPath,
47 Field,
48 GetCoreSchemaHandler,
49 PrivateAttr,
50 RootModel,
51 TypeAdapter,
52 model_serializer,
53 model_validator,
54)
55from pydantic_core import core_schema
56from tqdm import tqdm
57from typing_extensions import (
58 Annotated,
59 LiteralString,
60 NotRequired,
61 Self,
62 TypeGuard,
63 Unpack,
64 assert_never,
65)
66from typing_extensions import TypeAliasType as _TypeAliasType
68from ._settings import settings
69from .io_basics import (
70 ALL_BIOIMAGEIO_YAML_NAMES,
71 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES,
72 BIOIMAGEIO_YAML,
73 AbsoluteDirectory,
74 AbsoluteFilePath,
75 BytesReader,
76 FileName,
77 FilePath,
78 Sha256,
79 ZipPath,
80 get_sha256,
81)
82from .node import Node
83from .progress import ProgressbarLike
84from .root_url import RootHttpUrl
85from .type_guards import is_dict, is_list, is_mapping, is_sequence
86from .url import HttpUrl
87from .utils import SLOTS
88from .validation_context import get_validation_context
89from .version_type import Version
91AbsolutePathT = TypeVar(
92 "AbsolutePathT",
93 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath],
94)
97class LightHttpFileDescr(Node):
98 """http source with sha256 value (minimal validation)"""
100 source: pydantic.HttpUrl
101 """file source"""
103 sha256: Sha256
104 """SHA256 checksum of the source file"""
106 def get_reader(
107 self,
108 *,
109 progressbar: Union[
110 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
111 ] = None,
112 ) -> BytesReader:
113 """open the file source (download if needed)"""
114 return get_reader(self.source, sha256=self.sha256, progressbar=progressbar)
116 download = get_reader
117 """alias for get_reader() method"""
120class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True):
121 _absolute: AbsolutePathT = PrivateAttr()
123 @property
124 def path(self) -> PurePath:
125 return self.root
127 def absolute( # method not property analog to `pathlib.Path.absolute()`
128 self,
129 ) -> AbsolutePathT:
130 """get the absolute path/url
132 (resolved at time of initialization with the root of the ValidationContext)
133 """
134 return self._absolute
136 def model_post_init(self, __context: Any) -> None:
137 """set `_absolute` property with validation context at creation time. @private"""
138 if self.root.is_absolute():
139 raise ValueError(f"{self.root} is an absolute path.")
141 if self.root.parts and self.root.parts[0] in ("http:", "https:"):
142 raise ValueError(f"{self.root} looks like an http url.")
144 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue]
145 self.get_absolute(get_validation_context().root)
146 )
147 super().model_post_init(__context)
149 def __str__(self) -> str:
150 return self.root.as_posix()
152 def __repr__(self) -> str:
153 return f"RelativePath('{self}')"
155 @model_serializer()
156 def format(self) -> str:
157 return str(self)
159 @abstractmethod
160 def get_absolute(
161 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
162 ) -> AbsolutePathT: ...
164 def _get_absolute_impl(
165 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
166 ) -> Union[Path, HttpUrl, ZipPath]:
167 if isinstance(root, Path):
168 return (root / self.root).absolute()
170 rel_path = self.root.as_posix().strip("/")
171 if isinstance(root, ZipFile):
172 return ZipPath(root, rel_path)
174 parsed = urlsplit(str(root))
175 path = list(parsed.path.strip("/").split("/"))
176 if (
177 parsed.netloc == "zenodo.org"
178 and parsed.path.startswith("/api/records/")
179 and parsed.path.endswith("/content")
180 ):
181 path.insert(-1, rel_path)
182 else:
183 path.append(rel_path)
185 return HttpUrl(
186 urlunsplit(
187 (
188 parsed.scheme,
189 parsed.netloc,
190 "/".join(path),
191 parsed.query,
192 parsed.fragment,
193 )
194 )
195 )
197 @classmethod
198 def _validate(cls, value: Union[PurePath, str]):
199 if isinstance(value, str) and (
200 value.startswith("https://") or value.startswith("http://")
201 ):
202 raise ValueError(f"{value} looks like a URL, not a relative path")
204 return cls(PurePath(value))
207class RelativeFilePath(
208 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True
209):
210 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL)."""
212 def model_post_init(self, __context: Any) -> None:
213 """add validation @private"""
214 if not self.root.parts: # an empty path can only be a directory
215 raise ValueError(f"{self.root} is not a valid file path.")
217 super().model_post_init(__context)
219 def get_absolute(
220 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
221 ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
222 absolute = self._get_absolute_impl(root)
223 if (
224 isinstance(absolute, Path)
225 and (context := get_validation_context()).perform_io_checks
226 and str(self.root) not in context.known_files
227 and not absolute.is_file()
228 ):
229 raise ValueError(f"{absolute} does not point to an existing file")
231 return absolute
233 @property
234 def suffix(self):
235 return self.root.suffix
238class RelativeDirectory(
239 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True
240):
241 def get_absolute(
242 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
243 ) -> "AbsoluteDirectory | HttpUrl | ZipPath":
244 absolute = self._get_absolute_impl(root)
245 if (
246 isinstance(absolute, Path)
247 and get_validation_context().perform_io_checks
248 and not absolute.is_dir()
249 ):
250 raise ValueError(f"{absolute} does not point to an existing directory")
252 return absolute
255FileSource = Annotated[
256 Union[HttpUrl, RelativeFilePath, FilePath],
257 Field(union_mode="left_to_right"),
258]
261class FileDescr(Node):
262 """A file description"""
264 source: FileSource
265 """File source"""
267 sha256: Optional[Sha256] = None
268 """SHA256 hash value of the **source** file."""
270 @model_validator(mode="after")
271 def _validate_sha256(self) -> Self:
272 self.validate_sha256()
273 return self
275 def validate_sha256(self, force_recompute: bool = False) -> None:
276 """validate the sha256 hash value of the **source** file"""
277 context = get_validation_context()
278 src_str = str(self.source)
279 if force_recompute:
280 actual_sha = None
281 else:
282 actual_sha = context.known_files.get(src_str)
284 if actual_sha is None:
285 if context.perform_io_checks or force_recompute:
286 reader = get_reader(self.source, sha256=self.sha256)
287 if force_recompute:
288 actual_sha = get_sha256(reader)
289 else:
290 actual_sha = reader.sha256
292 context.known_files[src_str] = actual_sha
293 elif context.known_files and src_str not in context.known_files:
294 # perform_io_checks is False, but known files were given,
295 # so we expect all file references to be in there
296 raise ValueError(f"File {src_str} not found in `known_files`.")
298 if actual_sha is None or self.sha256 == actual_sha:
299 return
300 elif self.sha256 is None or context.update_hashes:
301 self.sha256 = actual_sha
302 elif self.sha256 != actual_sha:
303 raise ValueError(
304 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
305 + f"{actual_sha}. Update expected `sha256` or point to the matching "
306 + "file."
307 )
309 def get_reader(
310 self,
311 *,
312 progressbar: Union[
313 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
314 ] = None,
315 ):
316 """open the file source (download if needed)"""
317 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
319 def download(
320 self,
321 *,
322 progressbar: Union[
323 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
324 ] = None,
325 ):
326 """alias for `.get_reader`"""
327 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
329 @property
330 def suffix(self) -> str:
331 return self.source.suffix
334PermissiveFileSource = Union[FileSource, str, pydantic.HttpUrl, FileDescr, ZipPath]
337path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = (
338 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl])
339)
342@dataclass(frozen=True, **SLOTS)
343class WithSuffix:
344 suffix: Union[LiteralString, Tuple[LiteralString, ...]]
345 case_sensitive: bool
347 def __get_pydantic_core_schema__(
348 self, source: Type[Any], handler: GetCoreSchemaHandler
349 ):
350 if not self.suffix:
351 raise ValueError("suffix may not be empty")
353 schema = handler(source)
354 return core_schema.no_info_after_validator_function(
355 self.validate,
356 schema,
357 )
359 def validate(
360 self, value: Union[FileSource, FileDescr]
361 ) -> Union[FileSource, FileDescr]:
362 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive)
365def wo_special_file_name(src: F) -> F:
366 if has_valid_bioimageio_yaml_name(src):
367 raise ValueError(
368 f"'{src}' not allowed here as its filename is reserved to identify"
369 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
370 )
372 return src
375def has_valid_bioimageio_yaml_name(src: Union[FileSource, FileDescr]) -> bool:
376 return is_valid_bioimageio_yaml_name(extract_file_name(src))
379def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool:
380 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
381 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name):
382 return True
384 return False
387def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName:
388 file_names = sorted(file_names)
389 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
390 for file_name in file_names:
391 if file_name == bioimageio_name or file_name.endswith(
392 "." + bioimageio_name
393 ):
394 return file_name
396 raise ValueError(
397 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'"
398 + " or or any of the alterntive file names:"
399 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of"
400 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')."
401 )
404def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName:
405 if isinstance(path, ZipFile):
406 file_names = path.namelist()
407 elif path.is_file():
408 if not zipfile.is_zipfile(path):
409 return path.name
411 with ZipFile(path, "r") as f:
412 file_names = f.namelist()
413 else:
414 file_names = [p.name for p in path.glob("*")]
416 return identify_bioimageio_yaml_file_name(file_names)
419def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource:
420 if not has_valid_bioimageio_yaml_name(src):
421 raise ValueError(
422 f"'{src}' does not have a valid filename to identify"
423 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
424 )
426 return src
429def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName:
430 if not is_valid_bioimageio_yaml_name(file_name):
431 raise ValueError(
432 f"'{file_name}' is not a valid filename to identify"
433 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
434 )
436 return file_name
439# types as loaded from YAML 1.2 (with ruyaml)
440YamlLeafValue = Union[
441 bool, _date, _datetime, int, float, str, None
442] # note: order relevant for deserializing
443YamlKey = Union[ # YAML Arrays are cast to tuples if used as key in mappings
444 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though)
445]
446if TYPE_CHECKING:
447 YamlValue = Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]]
448 YamlValueView = Union[
449 YamlLeafValue, Sequence["YamlValueView"], Mapping[YamlKey, "YamlValueView"]
450 ]
451else:
452 # for pydantic validation we need to use `TypeAliasType`,
453 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types
454 # however this results in a partially unknown type with the current pyright 1.1.388
455 YamlValue = _TypeAliasType(
456 "YamlValue",
457 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]],
458 )
459 YamlValueView = _TypeAliasType(
460 "YamlValueView",
461 Union[
462 YamlLeafValue,
463 Sequence["YamlValueView"],
464 Mapping[YamlKey, "YamlValueView"],
465 ],
466 )
469BioimageioYamlContent = Dict[str, YamlValue]
470BioimageioYamlContentView = Mapping[str, YamlValueView]
472IncompleteDescrLeaf = Union[Node, YamlValue, PermissiveFileSource, Version]
473"""Leaf value of a partial description"""
475IncompleteDescrInner = Union[
476 IncompleteDescrLeaf,
477 List["IncompleteDescrInner"],
478 Dict[YamlKey, "IncompleteDescrInner"],
479]
480"""An inner node of an incomplete resource description --- YAML values and description nodes mixed."""
482IncompleteDescr = Dict[str, IncompleteDescrInner]
483"""An incomplete resource description --- YAML values and description nodes mixed."""
486IncompleteDescrLeafView = Union[Node, YamlValueView, PermissiveFileSource, Version]
487"""Non-editable leaf value of an incomplete description"""
489IncompleteDescrInnerView = Union[
490 IncompleteDescrLeafView,
491 Sequence["IncompleteDescrInnerView"],
492 Mapping[YamlKey, "IncompleteDescrInnerView"],
493 # Mapping[str, YamlValueView], # not sure why this is explicit Mapping is needed
494]
495"""A inner node of a non-editable incomplete resource description --- YAML value views and Node instances mixed."""
497IncompleteDescrView = Mapping[str, IncompleteDescrInnerView]
498"""A non-editable incomplete resource description --- YAML mappings and Node instances mixed."""
501BioimageioYamlSource = Union[
502 PermissiveFileSource, ZipFile, BioimageioYamlContent, BioimageioYamlContentView
503]
506@overload
507def deepcopy_yaml_value(value: BioimageioYamlContentView) -> BioimageioYamlContent: ...
510@overload
511def deepcopy_yaml_value(value: YamlValueView) -> YamlValue: ...
514def deepcopy_yaml_value(
515 value: Union[BioimageioYamlContentView, YamlValueView],
516) -> Union[BioimageioYamlContent, YamlValue]:
517 if isinstance(value, collections.abc.Mapping):
518 return {key: deepcopy_yaml_value(val) for key, val in value.items()}
519 elif isinstance(value, collections.abc.Sequence):
520 return [deepcopy_yaml_value(val) for val in value]
521 else:
522 return value
525def deepcopy_incomplete_descr(data: IncompleteDescrView) -> IncompleteDescr:
526 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
529def _deepcopy_incomplete_descr_impl(
530 data: IncompleteDescrInnerView,
531) -> IncompleteDescrInner:
532 if isinstance(data, Node):
533 return deepcopy(data)
534 elif isinstance(data, str):
535 return data
536 elif isinstance(data, collections.abc.Mapping):
537 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
538 elif isinstance(data, collections.abc.Sequence):
539 return [_deepcopy_incomplete_descr_impl(v) for v in data]
540 elif isinstance(
541 data,
542 (
543 HttpUrl,
544 Path,
545 PurePath,
546 RelativeFilePath,
547 Version,
548 _date,
549 _datetime,
550 bool,
551 float,
552 int,
553 pydantic.HttpUrl,
554 type(None),
555 ZipPath,
556 ),
557 ):
558 return data
559 else:
560 assert_never(data)
563def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]:
564 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None)))
567def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]:
568 return is_list(value) and all(is_yaml_value(item) for item in value)
571def is_yaml_sequence(value: Any) -> TypeGuard[List[YamlValueView]]:
572 return is_sequence(value) and all(is_yaml_value(item) for item in value)
575def is_yaml_dict(value: Any) -> TypeGuard[BioimageioYamlContent]:
576 return is_dict(value) and all(
577 isinstance(key, str) and is_yaml_value(val) for key, val in value.items()
578 )
581def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContentView]:
582 return is_mapping(value) and all(
583 isinstance(key, str) and is_yaml_value_read_only(val)
584 for key, val in value.items()
585 )
588def is_yaml_value(value: Any) -> TypeGuard[YamlValue]:
589 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_dict(value)
592def is_yaml_value_read_only(value: Any) -> TypeGuard[YamlValueView]:
593 return (
594 is_yaml_leaf_value(value) or is_yaml_sequence(value) or is_yaml_mapping(value)
595 )
598@dataclass(frozen=True, **SLOTS)
599class OpenedBioimageioYaml:
600 content: BioimageioYamlContent = field(repr=False)
601 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
602 original_source_name: Optional[str]
603 original_file_name: FileName
604 unparsed_content: str = field(repr=False)
607@dataclass(frozen=True, **SLOTS)
608class LocalFile:
609 path: FilePath
610 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
611 original_file_name: FileName
614@dataclass(frozen=True, **SLOTS)
615class FileInZip:
616 path: ZipPath
617 original_root: Union[RootHttpUrl, ZipFile]
618 original_file_name: FileName
621class HashKwargs(TypedDict):
622 sha256: NotRequired[Optional[Sha256]]
625_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = (
626 TypeAdapter(FileSource)
627)
630def interprete_file_source(
631 file_source: Union[FileSource, str, pydantic.HttpUrl],
632) -> FileSource:
633 if isinstance(file_source, Path):
634 if file_source.is_dir():
635 raise FileNotFoundError(
636 f"{file_source} is a directory, but expected a file."
637 )
638 return file_source
640 if isinstance(file_source, HttpUrl):
641 return file_source
643 if isinstance(file_source, pydantic.AnyUrl):
644 file_source = str(file_source)
646 with get_validation_context().replace(perform_io_checks=False):
647 strict = _file_source_adapter.validate_python(file_source)
648 if isinstance(strict, Path) and strict.is_dir():
649 raise FileNotFoundError(f"{strict} is a directory, but expected a file.")
651 return strict
654def extract(
655 source: Union[FilePath, ZipFile, ZipPath],
656 folder: Optional[DirectoryPath] = None,
657 overwrite: bool = False,
658) -> DirectoryPath:
659 extract_member = None
660 if isinstance(source, ZipPath):
661 extract_member = source.at
662 source = source.root
664 if isinstance(source, ZipFile):
665 zip_context = nullcontext(source)
666 if folder is None:
667 if source.filename is None:
668 folder = Path(mkdtemp())
669 else:
670 zip_path = Path(source.filename)
671 folder = zip_path.with_suffix(zip_path.suffix + ".unzip")
672 else:
673 zip_context = ZipFile(source, "r")
674 if folder is None:
675 folder = source.with_suffix(source.suffix + ".unzip")
677 if overwrite and folder.exists():
678 warnings.warn(f"Overwriting existing unzipped archive at {folder}")
680 with zip_context as f:
681 if extract_member is not None:
682 extracted_file_path = folder / extract_member
683 if extracted_file_path.exists() and not overwrite:
684 warnings.warn(f"Found unzipped {extracted_file_path}.")
685 else:
686 _ = f.extract(extract_member, folder)
688 return folder
690 elif overwrite or not folder.exists():
691 f.extractall(folder)
692 return folder
694 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")}
695 expected_content = {info.filename for info in f.filelist}
696 if expected_missing := expected_content - found_content:
697 parts = folder.name.split("_")
698 nr, *suffixes = parts[-1].split(".")
699 if nr.isdecimal():
700 nr = str(int(nr) + 1)
701 else:
702 nr = f"1.{nr}"
704 parts[-1] = ".".join([nr, *suffixes])
705 out_path_new = folder.with_name("_".join(parts))
706 warnings.warn(
707 f"Unzipped archive at {folder} is missing expected files"
708 + f" {expected_missing}."
709 + f" Unzipping to {out_path_new} instead to avoid overwriting."
710 )
711 return extract(f, out_path_new, overwrite=overwrite)
712 else:
713 warnings.warn(
714 f"Found unzipped archive with all expected files at {folder}."
715 )
716 return folder
719def get_reader(
720 source: Union[PermissiveFileSource, FileDescr, ZipPath],
721 /,
722 progressbar: Union[
723 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
724 ] = None,
725 **kwargs: Unpack[HashKwargs],
726) -> BytesReader:
727 """Open a file `source` (download if needed)"""
728 if isinstance(source, FileDescr):
729 if "sha256" not in kwargs:
730 kwargs["sha256"] = source.sha256
732 source = source.source
733 elif isinstance(source, str):
734 source = interprete_file_source(source)
736 if isinstance(source, RelativeFilePath):
737 source = source.absolute()
738 elif isinstance(source, pydantic.AnyUrl):
739 with get_validation_context().replace(perform_io_checks=False):
740 source = HttpUrl(source)
742 if isinstance(source, HttpUrl):
743 return _open_url(source, progressbar=progressbar, **kwargs)
745 if isinstance(source, ZipPath):
746 if not source.exists():
747 raise FileNotFoundError(source.filename)
749 f = source.open(mode="rb")
750 assert not isinstance(f, TextIOWrapper)
751 root = source.root
752 elif isinstance(source, Path):
753 if source.is_dir():
754 raise FileNotFoundError(f"{source} is a directory, not a file")
756 if not source.exists():
757 raise FileNotFoundError(source)
759 f = source.open("rb")
760 root = source.parent
761 else:
762 assert_never(source)
764 expected_sha = kwargs.get("sha256")
765 if expected_sha is None:
766 sha = None
767 else:
768 sha = get_sha256(f)
769 _ = f.seek(0)
770 if sha != expected_sha:
771 raise ValueError(
772 f"SHA256 mismatch for {source}. Expected {expected_sha}, got {sha}."
773 )
775 return BytesReader(
776 f,
777 sha256=sha,
778 suffix=source.suffix,
779 original_file_name=source.name,
780 original_root=root,
781 is_zipfile=None,
782 )
785download = get_reader
788def _open_url(
789 source: HttpUrl,
790 /,
791 progressbar: Union[ProgressbarLike, Callable[[], ProgressbarLike], bool, None],
792 **kwargs: Unpack[HashKwargs],
793) -> BytesReader:
794 cache = (
795 NoopCache[RootHttpUrl](url_hasher=UrlDigest.from_str)
796 if get_validation_context().disable_cache
797 else settings.disk_cache
798 )
799 sha = kwargs.get("sha256")
800 force_refetch = True if sha is None else ContentDigest.parse(hexdigest=sha)
801 source_path = PurePosixPath(
802 source.path
803 or sha
804 or hashlib.sha256(str(source).encode(encoding="utf-8")).hexdigest()
805 )
807 reader = cache.fetch(
808 source,
809 fetcher=partial(_fetch_url, progressbar=progressbar),
810 force_refetch=force_refetch,
811 )
812 return BytesReader(
813 reader,
814 suffix=source_path.suffix,
815 sha256=sha,
816 original_file_name=source_path.name,
817 original_root=source.parent,
818 is_zipfile=None,
819 )
822def _fetch_url(
823 source: RootHttpUrl,
824 *,
825 progressbar: Union[ProgressbarLike, Callable[[], ProgressbarLike], bool, None],
826):
827 if source.scheme not in ("http", "https"):
828 raise NotImplementedError(source.scheme)
830 if progressbar is None:
831 # chose progressbar option from validation context
832 progressbar = get_validation_context().progressbar
834 if progressbar is None:
835 # default to no progressbar in CI environments
836 progressbar = not settings.CI
838 if callable(progressbar):
839 progressbar = progressbar()
841 if isinstance(progressbar, bool) and progressbar:
842 progressbar = tqdm(
843 ncols=79,
844 ascii=bool(sys.platform == "win32"),
845 unit="B",
846 unit_scale=True,
847 leave=True,
848 )
850 if progressbar is not False:
851 progressbar.set_description(f"Downloading {extract_file_name(source)}")
853 headers: Dict[str, str] = {}
854 if settings.user_agent is not None:
855 headers["User-Agent"] = settings.user_agent
856 elif settings.CI:
857 headers["User-Agent"] = "ci"
859 r = httpx.get(
860 str(source),
861 follow_redirects=True,
862 headers=headers,
863 timeout=settings.http_timeout,
864 )
865 _ = r.raise_for_status()
867 # set progressbar.total
868 total = r.headers.get("content-length")
869 if total is not None and not isinstance(total, int):
870 try:
871 total = int(total)
872 except Exception:
873 total = None
875 if progressbar is not False:
876 if total is None:
877 progressbar.total = 0
878 else:
879 progressbar.total = total
881 def iter_content():
882 for chunk in r.iter_bytes(chunk_size=4096):
883 yield chunk
884 if progressbar is not False:
885 _ = progressbar.update(len(chunk))
887 # Make sure the progress bar gets filled even if the actual number
888 # is chunks is smaller than expected. This happens when streaming
889 # text files that are compressed by the server when sending (gzip).
890 # Binary files don't experience this.
891 # (adapted from pooch.HttpDownloader)
892 if progressbar is not False:
893 progressbar.reset()
894 if total is not None:
895 _ = progressbar.update(total)
897 progressbar.close()
899 return iter_content()
902def extract_file_name(
903 src: Union[
904 pydantic.HttpUrl, RootHttpUrl, PurePath, RelativeFilePath, ZipPath, FileDescr
905 ],
906) -> FileName:
907 if isinstance(src, FileDescr):
908 src = src.source
910 if isinstance(src, ZipPath):
911 return src.name or src.root.filename or "bioimageio.zip"
912 elif isinstance(src, RelativeFilePath):
913 return src.path.name
914 elif isinstance(src, PurePath):
915 return src.name
916 else:
917 url = urlparse(str(src))
918 if (
919 url.scheme == "https"
920 and url.hostname == "zenodo.org"
921 and url.path.startswith("/api/records/")
922 and url.path.endswith("/content")
923 ):
924 return url.path.split("/")[-2]
925 else:
926 return url.path.split("/")[-1]
929def extract_file_descrs(
930 data: IncompleteDescrView,
931) -> List[FileDescr]:
932 collected: List[FileDescr] = []
933 with get_validation_context().replace(perform_io_checks=False, log_warnings=False):
934 _extract_file_descrs_impl(data, collected)
936 return collected
939def _extract_file_descrs_impl(
940 data: Union[IncompleteDescrView, IncompleteDescrInnerView],
941 collected: List[FileDescr],
942) -> None:
943 if isinstance(data, FileDescr):
944 collected.append(data)
945 elif isinstance(data, Node):
946 for _, v in data:
947 _extract_file_descrs_impl(v, collected)
948 elif isinstance(data, collections.abc.Mapping):
949 if "source" in data and "sha256" in data:
950 try:
951 fd = FileDescr.model_validate(
952 dict(source=data["source"], sha256=data["sha256"])
953 )
954 except Exception:
955 warnings.warn(
956 "Found mapping with 'source' and 'sha256' keys, but could not parse it as a FileDescr. Ignoring `sha256`."
957 )
958 try:
959 fd = FileDescr.model_validate(dict(source=data["source"]))
960 except Exception:
961 warnings.warn(
962 f"Found mapping with 'source' and `sha256' keys , but could not parse it as a FileDescr, evning when ignoring 'sha256'. Ignoring `source`: {data['source']}."
963 )
964 else:
965 collected.append(fd)
966 else:
967 collected.append(fd)
969 for v in data.values():
970 _extract_file_descrs_impl(v, collected)
971 elif not isinstance(data, (str, Path, RelativeFilePath)) and isinstance(
972 data, collections.abc.Sequence
973 ):
974 for v in data:
975 _extract_file_descrs_impl(v, collected)
978F = TypeVar("F", bound=Union[FileSource, FileDescr])
981def validate_suffix(
982 value: F, suffix: Union[str, Sequence[str]], case_sensitive: bool
983) -> F:
984 """check final suffix"""
985 if isinstance(suffix, str):
986 suffixes = [suffix]
987 else:
988 suffixes = suffix
990 assert len(suffixes) > 0, "no suffix given"
991 assert all(suff.startswith(".") for suff in suffixes), (
992 "expected suffixes to start with '.'"
993 )
994 o_value = value
995 if isinstance(value, FileDescr):
996 strict = value.source
997 else:
998 strict = interprete_file_source(value)
1000 if isinstance(strict, (HttpUrl, AnyUrl)):
1001 if strict.path is None or "." not in (path := strict.path):
1002 actual_suffixes = []
1003 else:
1004 if (
1005 strict.host == "zenodo.org"
1006 and path.startswith("/api/records/")
1007 and path.endswith("/content")
1008 ):
1009 # Zenodo API URLs have a "/content" suffix that should be ignored
1010 path = path[: -len("/content")]
1012 actual_suffixes = [f".{path.split('.')[-1]}"]
1014 elif isinstance(strict, PurePath):
1015 actual_suffixes = strict.suffixes
1016 elif isinstance(strict, RelativeFilePath):
1017 actual_suffixes = strict.path.suffixes
1018 else:
1019 assert_never(strict)
1021 if actual_suffixes:
1022 actual_suffix = actual_suffixes[-1]
1023 else:
1024 actual_suffix = "no suffix"
1026 if (
1027 case_sensitive
1028 and actual_suffix not in suffixes
1029 or not case_sensitive
1030 and actual_suffix.lower() not in [s.lower() for s in suffixes]
1031 ):
1032 if len(suffixes) == 1:
1033 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}")
1034 else:
1035 raise ValueError(
1036 f"Expected a suffix from {suffixes}, but got {actual_suffix}"
1037 )
1039 return o_value
1042def populate_cache(sources: Sequence[Union[FileDescr, LightHttpFileDescr]]):
1043 unique: Set[str] = set()
1044 for src in sources:
1045 if src.sha256 is None:
1046 continue # not caching without known SHA
1048 if isinstance(src.source, (HttpUrl, pydantic.AnyUrl)):
1049 url = str(src.source)
1050 elif isinstance(src.source, RelativeFilePath):
1051 if isinstance(absolute := src.source.absolute(), HttpUrl):
1052 url = str(absolute)
1053 else:
1054 continue # not caching local paths
1055 elif isinstance(src.source, Path):
1056 continue # not caching local paths
1057 else:
1058 assert_never(src.source)
1060 if url in unique:
1061 continue # skip duplicate URLs
1063 unique.add(url)
1064 _ = src.download()