Coverage for src/bioimageio/spec/_internal/io.py: 78%
490 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:08 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:08 +0000
1from __future__ import annotations
3import collections.abc
4import hashlib
5import sys
6import warnings
7import zipfile
8from abc import abstractmethod
9from contextlib import nullcontext
10from copy import deepcopy
11from dataclasses import dataclass, field
12from datetime import date as _date
13from datetime import datetime as _datetime
14from functools import partial
15from io import TextIOWrapper
16from pathlib import Path, PurePath, PurePosixPath
17from tempfile import mkdtemp
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Callable,
22 Dict,
23 Generic,
24 Iterable,
25 List,
26 Mapping,
27 Optional,
28 Sequence,
29 Set,
30 Tuple,
31 Type,
32 TypedDict,
33 TypeVar,
34 Union,
35 overload,
36)
37from urllib.parse import urlparse, urlsplit, urlunsplit
38from zipfile import ZipFile
40import httpx
41import pydantic
42from genericache import NoopCache
43from genericache.digest import ContentDigest, UrlDigest
44from pydantic import (
45 AnyUrl,
46 DirectoryPath,
47 Field,
48 GetCoreSchemaHandler,
49 PrivateAttr,
50 RootModel,
51 TypeAdapter,
52 model_serializer,
53 model_validator,
54)
55from pydantic_core import core_schema
56from tqdm import tqdm
57from typing_extensions import (
58 Annotated,
59 LiteralString,
60 NotRequired,
61 Self,
62 TypeAlias,
63 TypeGuard,
64 Unpack,
65 assert_never,
66)
67from typing_extensions import TypeAliasType as _TypeAliasType
69from ._settings import settings
70from .io_basics import (
71 ALL_BIOIMAGEIO_YAML_NAMES,
72 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES,
73 BIOIMAGEIO_YAML,
74 AbsoluteDirectory,
75 AbsoluteFilePath,
76 BytesReader,
77 FileName,
78 FilePath,
79 Sha256,
80 ZipPath,
81 get_sha256,
82)
83from .node import Node
84from .progress import ProgressbarLike
85from .root_url import RootHttpUrl
86from .type_guards import is_dict, is_list, is_mapping, is_sequence
87from .url import HttpUrl
88from .utils import SLOTS
89from .validation_context import get_validation_context
90from .version_type import Version
92AbsolutePathT = TypeVar(
93 "AbsolutePathT",
94 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath],
95)
98class LightHttpFileDescr(Node):
99 """http source with sha256 value (minimal validation)"""
101 source: pydantic.HttpUrl
102 """file source"""
104 sha256: Sha256
105 """SHA256 checksum of the source file"""
107 def get_reader(
108 self,
109 *,
110 progressbar: Union[
111 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
112 ] = None,
113 ) -> BytesReader:
114 """open the file source (download if needed)"""
115 return get_reader(self.source, sha256=self.sha256, progressbar=progressbar)
117 download = get_reader
118 """alias for get_reader() method"""
121class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True):
122 _absolute: AbsolutePathT = PrivateAttr()
124 @property
125 def path(self) -> PurePath:
126 return self.root
128 def absolute( # method not property analog to `pathlib.Path.absolute()`
129 self,
130 ) -> AbsolutePathT:
131 """get the absolute path/url
133 (resolved at time of initialization with the root of the ValidationContext)
134 """
135 return self._absolute
137 def model_post_init(self, __context: Any) -> None:
138 """set `_absolute` property with validation context at creation time. @private"""
139 if self.root.is_absolute():
140 raise ValueError(f"{self.root} is an absolute path.")
142 if self.root.parts and self.root.parts[0] in ("http:", "https:"):
143 raise ValueError(f"{self.root} looks like an http url.")
145 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue]
146 self.get_absolute(get_validation_context().root)
147 )
148 super().model_post_init(__context)
150 def __str__(self) -> str:
151 return self.root.as_posix()
153 def __repr__(self) -> str:
154 return f"RelativePath('{self}')"
156 @model_serializer()
157 def format(self) -> str:
158 return str(self)
160 @abstractmethod
161 def get_absolute(
162 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
163 ) -> AbsolutePathT: ...
165 def _get_absolute_impl(
166 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
167 ) -> Union[Path, HttpUrl, ZipPath]:
168 if isinstance(root, Path):
169 return (root / self.root).absolute()
171 rel_path = self.root.as_posix().strip("/")
172 if isinstance(root, ZipFile):
173 return ZipPath(root, rel_path)
175 parsed = urlsplit(str(root))
176 path = list(parsed.path.strip("/").split("/"))
177 if (
178 parsed.netloc == "zenodo.org"
179 and parsed.path.startswith("/api/records/")
180 and parsed.path.endswith("/content")
181 ):
182 path.insert(-1, rel_path)
183 else:
184 path.append(rel_path)
186 return HttpUrl(
187 urlunsplit(
188 (
189 parsed.scheme,
190 parsed.netloc,
191 "/".join(path),
192 parsed.query,
193 parsed.fragment,
194 )
195 )
196 )
198 @classmethod
199 def _validate(cls, value: Union[PurePath, str]):
200 if isinstance(value, str) and (
201 value.startswith("https://") or value.startswith("http://")
202 ):
203 raise ValueError(f"{value} looks like a URL, not a relative path")
205 return cls(PurePath(value))
208class RelativeFilePath(
209 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True
210):
211 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL)."""
213 def model_post_init(self, __context: Any) -> None:
214 """add validation @private"""
215 if not self.root.parts: # an empty path can only be a directory
216 raise ValueError(f"{self.root} is not a valid file path.")
218 super().model_post_init(__context)
220 def get_absolute(
221 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
222 ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
223 absolute = self._get_absolute_impl(root)
224 if (
225 isinstance(absolute, Path)
226 and (context := get_validation_context()).perform_io_checks
227 and str(self.root) not in context.known_files
228 and not absolute.is_file()
229 ):
230 raise ValueError(f"{absolute} does not point to an existing file")
232 return absolute
234 @property
235 def suffix(self):
236 return self.root.suffix
239class RelativeDirectory(
240 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True
241):
242 def get_absolute(
243 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
244 ) -> "AbsoluteDirectory | HttpUrl | ZipPath":
245 absolute = self._get_absolute_impl(root)
246 if (
247 isinstance(absolute, Path)
248 and get_validation_context().perform_io_checks
249 and not absolute.is_dir()
250 ):
251 raise ValueError(f"{absolute} does not point to an existing directory")
253 return absolute
256FileSource = Annotated[
257 Union[HttpUrl, RelativeFilePath, FilePath],
258 Field(union_mode="left_to_right"),
259]
262class FileDescr(Node):
263 """A file description"""
265 source: FileSource
266 """File source"""
268 sha256: Optional[Sha256] = None
269 """SHA256 hash value of the **source** file."""
271 @model_validator(mode="after")
272 def _validate_sha256(self) -> Self:
273 self.validate_sha256()
274 return self
276 def validate_sha256(self, force_recompute: bool = False) -> None:
277 """validate the sha256 hash value of the **source** file"""
278 context = get_validation_context()
279 src_str = str(self.source)
280 if force_recompute:
281 actual_sha = None
282 else:
283 actual_sha = context.known_files.get(src_str)
285 if actual_sha is None:
286 if context.perform_io_checks or force_recompute:
287 reader = get_reader(self.source, sha256=self.sha256)
288 if force_recompute:
289 actual_sha = get_sha256(reader)
290 else:
291 actual_sha = reader.sha256
293 context.known_files[src_str] = actual_sha
294 elif context.known_files and src_str not in context.known_files:
295 # perform_io_checks is False, but known files were given,
296 # so we expect all file references to be in there
297 raise ValueError(f"File {src_str} not found in `known_files`.")
299 if actual_sha is None or self.sha256 == actual_sha:
300 return
301 elif self.sha256 is None or context.update_hashes:
302 self.sha256 = actual_sha
303 elif self.sha256 != actual_sha:
304 raise ValueError(
305 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
306 + f"{actual_sha}. Update expected `sha256` or point to the matching "
307 + "file."
308 )
310 def get_reader(
311 self,
312 *,
313 progressbar: Union[
314 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
315 ] = None,
316 ):
317 """open the file source (download if needed)"""
318 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
320 def download(
321 self,
322 *,
323 progressbar: Union[
324 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
325 ] = None,
326 ):
327 """alias for `.get_reader`"""
328 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
330 @property
331 def suffix(self) -> str:
332 return self.source.suffix
335PermissiveFileSource: TypeAlias = Union[
336 FileSource, str, pydantic.HttpUrl, FileDescr, ZipPath
337]
340path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = (
341 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl])
342)
345@dataclass(frozen=True, **SLOTS)
346class WithSuffix:
347 suffix: Union[LiteralString, Tuple[LiteralString, ...]]
348 case_sensitive: bool
350 def __get_pydantic_core_schema__(
351 self, source: Type[Any], handler: GetCoreSchemaHandler
352 ):
353 if not self.suffix:
354 raise ValueError("suffix may not be empty")
356 schema = handler(source)
357 return core_schema.no_info_after_validator_function(
358 self.validate,
359 schema,
360 )
362 def validate(
363 self, value: Union[FileSource, FileDescr]
364 ) -> Union[FileSource, FileDescr]:
365 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive)
368def wo_special_file_name(src: F) -> F:
369 if has_valid_bioimageio_yaml_name(src):
370 raise ValueError(
371 f"'{src}' not allowed here as its filename is reserved to identify"
372 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
373 )
375 return src
378def has_valid_bioimageio_yaml_name(src: Union[FileSource, FileDescr]) -> bool:
379 return is_valid_bioimageio_yaml_name(extract_file_name(src))
382def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool:
383 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
384 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name):
385 return True
387 return False
390def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName:
391 file_names = sorted(file_names)
392 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
393 for file_name in file_names:
394 if file_name == bioimageio_name or file_name.endswith(
395 "." + bioimageio_name
396 ):
397 return file_name
399 raise ValueError(
400 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'"
401 + " or or any of the alterntive file names:"
402 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of"
403 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')."
404 )
407def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName:
408 if isinstance(path, ZipFile):
409 file_names = path.namelist()
410 elif path.is_file():
411 if not zipfile.is_zipfile(path):
412 return path.name
414 with ZipFile(path, "r") as f:
415 file_names = f.namelist()
416 else:
417 file_names = [p.name for p in path.glob("*")]
419 return identify_bioimageio_yaml_file_name(file_names)
422def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource:
423 if not has_valid_bioimageio_yaml_name(src):
424 raise ValueError(
425 f"'{src}' does not have a valid filename to identify"
426 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
427 )
429 return src
432def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName:
433 if not is_valid_bioimageio_yaml_name(file_name):
434 raise ValueError(
435 f"'{file_name}' is not a valid filename to identify"
436 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
437 )
439 return file_name
442# types as loaded from YAML 1.2 (with ruyaml)
443YamlLeafValue: TypeAlias = Union[
444 bool, _date, _datetime, int, float, str, None
445] # note: order relevant for deserializing
446YamlKey: TypeAlias = Union[ # YAML Arrays are cast to tuples if used as key in mappings
447 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though)
448]
449if TYPE_CHECKING:
450 YamlValue: TypeAlias = Union[
451 YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]
452 ]
453 YamlValueView: TypeAlias = Union[
454 YamlLeafValue, Sequence["YamlValueView"], Mapping[YamlKey, "YamlValueView"]
455 ]
456else:
457 # for pydantic validation we need to use `TypeAliasType`,
458 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types
459 # however this results in a partially unknown type with the current pyright 1.1.388
460 YamlValue: TypeAlias = _TypeAliasType(
461 "YamlValue",
462 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]],
463 )
464 YamlValueView: TypeAlias = _TypeAliasType(
465 "YamlValueView",
466 Union[
467 YamlLeafValue,
468 Sequence["YamlValueView"],
469 Mapping[YamlKey, "YamlValueView"],
470 ],
471 )
474BioimageioYamlContent = Dict[str, YamlValue]
475BioimageioYamlContentView = Mapping[str, YamlValueView]
477IncompleteDescrLeaf = Union[Node, YamlValue, PermissiveFileSource, Version]
478"""Leaf value of a partial description"""
480IncompleteDescrInner = Union[
481 IncompleteDescrLeaf,
482 List["IncompleteDescrInner"],
483 Dict[YamlKey, "IncompleteDescrInner"],
484]
485"""An inner node of an incomplete resource description --- YAML values and description nodes mixed."""
487IncompleteDescr = Dict[str, IncompleteDescrInner]
488"""An incomplete resource description --- YAML values and description nodes mixed."""
491IncompleteDescrLeafView = Union[Node, YamlValueView, PermissiveFileSource, Version]
492"""Non-editable leaf value of an incomplete description"""
494IncompleteDescrInnerView = Union[
495 IncompleteDescrLeafView,
496 Sequence["IncompleteDescrInnerView"],
497 Mapping[YamlKey, "IncompleteDescrInnerView"],
498 # Mapping[str, YamlValueView], # not sure why this is explicit Mapping is needed
499]
500"""A inner node of a non-editable incomplete resource description --- YAML value views and Node instances mixed."""
502IncompleteDescrView = Mapping[str, IncompleteDescrInnerView]
503"""A non-editable incomplete resource description --- YAML mappings and Node instances mixed."""
506BioimageioYamlSource = Union[
507 PermissiveFileSource, ZipFile, BioimageioYamlContent, BioimageioYamlContentView
508]
511@overload
512def deepcopy_yaml_value(value: BioimageioYamlContentView) -> BioimageioYamlContent: ...
515@overload
516def deepcopy_yaml_value(value: YamlValueView) -> YamlValue: ...
519def deepcopy_yaml_value(
520 value: Union[BioimageioYamlContentView, YamlValueView],
521) -> Union[BioimageioYamlContent, YamlValue]:
522 if isinstance(value, collections.abc.Mapping):
523 return {key: deepcopy_yaml_value(val) for key, val in value.items()}
524 elif isinstance(value, collections.abc.Sequence):
525 return [deepcopy_yaml_value(val) for val in value]
526 else:
527 return value
530def deepcopy_incomplete_descr(data: IncompleteDescrView) -> IncompleteDescr:
531 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
534def _deepcopy_incomplete_descr_impl(
535 data: IncompleteDescrInnerView,
536) -> IncompleteDescrInner:
537 if isinstance(data, Node):
538 return deepcopy(data)
539 elif isinstance(data, str):
540 return data
541 elif isinstance(data, collections.abc.Mapping):
542 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
543 elif isinstance(data, collections.abc.Sequence):
544 return [_deepcopy_incomplete_descr_impl(v) for v in data]
545 elif isinstance(
546 data,
547 (
548 HttpUrl,
549 Path,
550 PurePath,
551 RelativeFilePath,
552 Version,
553 _date,
554 _datetime,
555 bool,
556 float,
557 int,
558 pydantic.HttpUrl,
559 type(None),
560 ZipPath,
561 ),
562 ):
563 return data
564 else:
565 assert_never(data)
568def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]:
569 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None)))
572def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]:
573 return is_list(value) and all(is_yaml_value(item) for item in value)
576def is_yaml_sequence(value: Any) -> TypeGuard[List[YamlValueView]]:
577 return is_sequence(value) and all(is_yaml_value(item) for item in value)
580def is_yaml_dict(value: Any) -> TypeGuard[BioimageioYamlContent]:
581 return is_dict(value) and all(
582 isinstance(key, str) and is_yaml_value(val) for key, val in value.items()
583 )
586def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContentView]:
587 return is_mapping(value) and all(
588 isinstance(key, str) and is_yaml_value_read_only(val)
589 for key, val in value.items()
590 )
593def is_yaml_value(value: Any) -> TypeGuard[YamlValue]:
594 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_dict(value)
597def is_yaml_value_read_only(value: Any) -> TypeGuard[YamlValueView]:
598 return (
599 is_yaml_leaf_value(value) or is_yaml_sequence(value) or is_yaml_mapping(value)
600 )
603@dataclass(frozen=True, **SLOTS)
604class OpenedBioimageioYaml:
605 content: BioimageioYamlContent = field(repr=False)
606 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
607 original_source_name: Optional[str]
608 original_file_name: FileName
609 unparsed_content: str = field(repr=False)
612@dataclass(frozen=True, **SLOTS)
613class LocalFile:
614 path: FilePath
615 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
616 original_file_name: FileName
619@dataclass(frozen=True, **SLOTS)
620class FileInZip:
621 path: ZipPath
622 original_root: Union[RootHttpUrl, ZipFile]
623 original_file_name: FileName
626class HashKwargs(TypedDict):
627 sha256: NotRequired[Optional[Sha256]]
630_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = (
631 TypeAdapter(FileSource)
632)
635def interprete_file_source(
636 file_source: Union[FileSource, str, pydantic.HttpUrl],
637) -> FileSource:
638 if isinstance(file_source, Path):
639 if file_source.is_dir():
640 raise FileNotFoundError(
641 f"{file_source} is a directory, but expected a file."
642 )
643 return file_source
645 if isinstance(file_source, HttpUrl):
646 return file_source
648 if isinstance(file_source, pydantic.AnyUrl):
649 file_source = str(file_source)
651 with get_validation_context().replace(perform_io_checks=False):
652 strict = _file_source_adapter.validate_python(file_source)
653 if isinstance(strict, Path) and strict.is_dir():
654 raise FileNotFoundError(f"{strict} is a directory, but expected a file.")
656 return strict
659def extract(
660 source: Union[FilePath, ZipFile, ZipPath],
661 folder: Optional[DirectoryPath] = None,
662 overwrite: bool = False,
663) -> DirectoryPath:
664 extract_member = None
665 if isinstance(source, ZipPath):
666 extract_member = source.at
667 source = source.root
669 if isinstance(source, ZipFile):
670 zip_context = nullcontext(source)
671 if folder is None:
672 if source.filename is None:
673 folder = Path(mkdtemp())
674 else:
675 zip_path = Path(source.filename)
676 folder = zip_path.with_suffix(zip_path.suffix + ".unzip")
677 else:
678 zip_context = ZipFile(source, "r")
679 if folder is None:
680 folder = source.with_suffix(source.suffix + ".unzip")
682 if overwrite and folder.exists():
683 warnings.warn(f"Overwriting existing unzipped archive at {folder}")
685 with zip_context as f:
686 if extract_member is not None:
687 extracted_file_path = folder / extract_member
688 if extracted_file_path.exists() and not overwrite:
689 warnings.warn(f"Found unzipped {extracted_file_path}.")
690 else:
691 _ = f.extract(extract_member, folder)
693 return folder
695 elif overwrite or not folder.exists():
696 f.extractall(folder)
697 return folder
699 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")}
700 expected_content = {info.filename for info in f.filelist}
701 if expected_missing := expected_content - found_content:
702 parts = folder.name.split("_")
703 nr, *suffixes = parts[-1].split(".")
704 if nr.isdecimal():
705 nr = str(int(nr) + 1)
706 else:
707 nr = f"1.{nr}"
709 parts[-1] = ".".join([nr, *suffixes])
710 out_path_new = folder.with_name("_".join(parts))
711 warnings.warn(
712 f"Unzipped archive at {folder} is missing expected files"
713 + f" {expected_missing}."
714 + f" Unzipping to {out_path_new} instead to avoid overwriting."
715 )
716 return extract(f, out_path_new, overwrite=overwrite)
717 else:
718 warnings.warn(
719 f"Found unzipped archive with all expected files at {folder}."
720 )
721 return folder
724def get_reader(
725 source: Union[PermissiveFileSource, FileDescr, ZipPath],
726 /,
727 progressbar: Union[
728 ProgressbarLike, Callable[[], ProgressbarLike], bool, None
729 ] = None,
730 **kwargs: Unpack[HashKwargs],
731) -> BytesReader:
732 """Open a file `source` (download if needed)"""
733 if isinstance(source, FileDescr):
734 if "sha256" not in kwargs:
735 kwargs["sha256"] = source.sha256
737 source = source.source
738 elif isinstance(source, str):
739 source = interprete_file_source(source)
741 if isinstance(source, RelativeFilePath):
742 source = source.absolute()
743 elif isinstance(source, pydantic.AnyUrl):
744 with get_validation_context().replace(perform_io_checks=False):
745 source = HttpUrl(source)
747 if isinstance(source, HttpUrl):
748 return _open_url(source, progressbar=progressbar, **kwargs)
750 if isinstance(source, ZipPath):
751 if not source.exists():
752 raise FileNotFoundError(source.filename)
754 f = source.open(mode="rb")
755 assert not isinstance(f, TextIOWrapper)
756 root = source.root
757 elif isinstance(source, Path):
758 if source.is_dir():
759 raise FileNotFoundError(f"{source} is a directory, not a file")
761 if not source.exists():
762 raise FileNotFoundError(source)
764 f = source.open("rb")
765 root = source.parent
766 else:
767 assert_never(source)
769 expected_sha = kwargs.get("sha256")
770 if expected_sha is None:
771 sha = None
772 else:
773 sha = get_sha256(f)
774 _ = f.seek(0)
775 if sha != expected_sha:
776 raise ValueError(
777 f"SHA256 mismatch for {source}. Expected {expected_sha}, got {sha}."
778 )
780 return BytesReader(
781 f,
782 sha256=sha,
783 suffix=source.suffix,
784 original_file_name=source.name,
785 original_root=root,
786 is_zipfile=None,
787 )
790download = get_reader
793def _open_url(
794 source: HttpUrl,
795 /,
796 progressbar: Union[ProgressbarLike, Callable[[], ProgressbarLike], bool, None],
797 **kwargs: Unpack[HashKwargs],
798) -> BytesReader:
799 cache = (
800 NoopCache[RootHttpUrl](url_hasher=UrlDigest.from_str)
801 if get_validation_context().disable_cache
802 else settings.disk_cache
803 )
804 sha = kwargs.get("sha256")
805 force_refetch = True if sha is None else ContentDigest.parse(hexdigest=sha)
806 source_path = PurePosixPath(
807 source.path
808 or sha
809 or hashlib.sha256(str(source).encode(encoding="utf-8")).hexdigest()
810 )
812 reader = cache.fetch(
813 source,
814 fetcher=partial(_fetch_url, progressbar=progressbar),
815 force_refetch=force_refetch,
816 )
817 return BytesReader(
818 reader,
819 suffix=source_path.suffix,
820 sha256=sha,
821 original_file_name=source_path.name,
822 original_root=source.parent,
823 is_zipfile=None,
824 )
827def _fetch_url(
828 source: RootHttpUrl,
829 *,
830 progressbar: Union[ProgressbarLike, Callable[[], ProgressbarLike], bool, None],
831):
832 if source.scheme not in ("http", "https"):
833 raise NotImplementedError(source.scheme)
835 if progressbar is None:
836 # chose progressbar option from validation context
837 progressbar = get_validation_context().progressbar
839 if progressbar is None:
840 # default to no progressbar in CI environments
841 progressbar = not settings.CI
843 if callable(progressbar):
844 progressbar = progressbar()
846 if isinstance(progressbar, bool) and progressbar:
847 progressbar = tqdm(
848 ncols=79,
849 ascii=bool(sys.platform == "win32"),
850 unit="B",
851 unit_scale=True,
852 leave=True,
853 )
855 if progressbar is not False:
856 progressbar.set_description(f"Downloading {extract_file_name(source)}")
858 headers: Dict[str, str] = {}
859 if settings.user_agent is not None:
860 headers["User-Agent"] = settings.user_agent
861 elif settings.CI:
862 headers["User-Agent"] = "ci"
864 r = httpx.get(
865 str(source),
866 follow_redirects=True,
867 headers=headers,
868 timeout=settings.http_timeout,
869 )
870 _ = r.raise_for_status()
872 # set progressbar.total
873 total = r.headers.get("content-length")
874 if total is not None and not isinstance(total, int):
875 try:
876 total = int(total)
877 except Exception:
878 total = None
880 if progressbar is not False:
881 if total is None:
882 progressbar.total = 0
883 else:
884 progressbar.total = total
886 def iter_content():
887 for chunk in r.iter_bytes(chunk_size=4096):
888 yield chunk
889 if progressbar is not False:
890 _ = progressbar.update(len(chunk))
892 # Make sure the progress bar gets filled even if the actual number
893 # is chunks is smaller than expected. This happens when streaming
894 # text files that are compressed by the server when sending (gzip).
895 # Binary files don't experience this.
896 # (adapted from pooch.HttpDownloader)
897 if progressbar is not False:
898 progressbar.reset()
899 if total is not None:
900 _ = progressbar.update(total)
902 progressbar.close()
904 return iter_content()
907def extract_file_name(
908 src: Union[
909 pydantic.HttpUrl, RootHttpUrl, PurePath, RelativeFilePath, ZipPath, FileDescr
910 ],
911) -> FileName:
912 if isinstance(src, FileDescr):
913 src = src.source
915 if isinstance(src, ZipPath):
916 return src.name or src.root.filename or "bioimageio.zip"
917 elif isinstance(src, RelativeFilePath):
918 return src.path.name
919 elif isinstance(src, PurePath):
920 return src.name
921 else:
922 url = urlparse(str(src))
923 if (
924 url.scheme == "https"
925 and url.hostname == "zenodo.org"
926 and url.path.startswith("/api/records/")
927 and url.path.endswith("/content")
928 ):
929 return url.path.split("/")[-2]
930 else:
931 return url.path.split("/")[-1]
934def extract_file_descrs(
935 data: IncompleteDescrView,
936) -> List[FileDescr]:
937 collected: List[FileDescr] = []
938 with get_validation_context().replace(perform_io_checks=False, log_warnings=False):
939 _extract_file_descrs_impl(data, collected)
941 return collected
944def _extract_file_descrs_impl(
945 data: Union[IncompleteDescrView, IncompleteDescrInnerView],
946 collected: List[FileDescr],
947) -> None:
948 if isinstance(data, FileDescr):
949 collected.append(data)
950 elif isinstance(data, Node):
951 for _, v in data:
952 _extract_file_descrs_impl(v, collected)
953 elif isinstance(data, collections.abc.Mapping):
954 if "source" in data and "sha256" in data:
955 try:
956 fd = FileDescr.model_validate(
957 dict(source=data["source"], sha256=data["sha256"])
958 )
959 except Exception:
960 warnings.warn(
961 "Found mapping with 'source' and 'sha256' keys, but could not parse it as a FileDescr. Ignoring `sha256`."
962 )
963 try:
964 fd = FileDescr.model_validate(dict(source=data["source"]))
965 except Exception:
966 warnings.warn(
967 f"Found mapping with 'source' and `sha256' keys , but could not parse it as a FileDescr, evning when ignoring 'sha256'. Ignoring `source`: {data['source']}."
968 )
969 else:
970 collected.append(fd)
971 else:
972 collected.append(fd)
974 for v in data.values():
975 _extract_file_descrs_impl(v, collected)
976 elif not isinstance(data, (str, Path, RelativeFilePath)) and isinstance(
977 data, collections.abc.Sequence
978 ):
979 for v in data:
980 _extract_file_descrs_impl(v, collected)
983F = TypeVar("F", bound=Union[FileSource, FileDescr])
986def validate_suffix(
987 value: F, suffix: Union[str, Sequence[str]], case_sensitive: bool
988) -> F:
989 """check final suffix"""
990 if isinstance(suffix, str):
991 suffixes = [suffix]
992 else:
993 suffixes = suffix
995 assert len(suffixes) > 0, "no suffix given"
996 assert all(suff.startswith(".") for suff in suffixes), (
997 "expected suffixes to start with '.'"
998 )
999 o_value = value
1000 if isinstance(value, FileDescr):
1001 strict = value.source
1002 else:
1003 strict = interprete_file_source(value)
1005 if isinstance(strict, (HttpUrl, AnyUrl)):
1006 if strict.path is None or "." not in (path := strict.path):
1007 actual_suffixes = []
1008 else:
1009 if (
1010 strict.host == "zenodo.org"
1011 and path.startswith("/api/records/")
1012 and path.endswith("/content")
1013 ):
1014 # Zenodo API URLs have a "/content" suffix that should be ignored
1015 path = path[: -len("/content")]
1017 actual_suffixes = [f".{path.split('.')[-1]}"]
1019 elif isinstance(strict, PurePath):
1020 actual_suffixes = strict.suffixes
1021 elif isinstance(strict, RelativeFilePath):
1022 actual_suffixes = strict.path.suffixes
1023 else:
1024 assert_never(strict)
1026 if actual_suffixes:
1027 actual_suffix = actual_suffixes[-1]
1028 else:
1029 actual_suffix = "no suffix"
1031 if (
1032 case_sensitive
1033 and actual_suffix not in suffixes
1034 or not case_sensitive
1035 and actual_suffix.lower() not in [s.lower() for s in suffixes]
1036 ):
1037 if len(suffixes) == 1:
1038 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}")
1039 else:
1040 raise ValueError(
1041 f"Expected a suffix from {suffixes}, but got {actual_suffix}"
1042 )
1044 return o_value
1047def populate_cache(sources: Sequence[Union[FileDescr, LightHttpFileDescr]]):
1048 unique: Set[str] = set()
1049 for src in sources:
1050 if src.sha256 is None:
1051 continue # not caching without known SHA
1053 if isinstance(src.source, (HttpUrl, pydantic.AnyUrl)):
1054 url = str(src.source)
1055 elif isinstance(src.source, RelativeFilePath):
1056 if isinstance(absolute := src.source.absolute(), HttpUrl):
1057 url = str(absolute)
1058 else:
1059 continue # not caching local paths
1060 elif isinstance(src.source, Path):
1061 continue # not caching local paths
1062 else:
1063 assert_never(src.source)
1065 if url in unique:
1066 continue # skip duplicate URLs
1068 unique.add(url)
1069 _ = src.download()