Coverage for src / bioimageio / spec / _internal / io.py: 78%
490 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 14:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-27 14:45 +0000
1from __future__ import annotations
3import collections.abc
4import hashlib
5import sys
6import warnings
7import zipfile
8from abc import abstractmethod
9from contextlib import nullcontext
10from copy import deepcopy
11from dataclasses import dataclass, field
12from datetime import date as _date
13from datetime import datetime as _datetime
14from functools import partial
15from io import TextIOWrapper
16from pathlib import Path, PurePath, PurePosixPath
17from tempfile import mkdtemp
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Callable,
22 Dict,
23 Generic,
24 Iterable,
25 List,
26 Mapping,
27 Optional,
28 Sequence,
29 Set,
30 Tuple,
31 Type,
32 TypedDict,
33 TypeVar,
34 Union,
35 overload,
36)
37from urllib.parse import urlparse, urlsplit, urlunsplit
38from zipfile import ZipFile
40import httpx
41import pydantic
42from genericache import NoopCache
43from genericache.digest import ContentDigest, UrlDigest
44from pydantic import (
45 AnyUrl,
46 DirectoryPath,
47 Field,
48 GetCoreSchemaHandler,
49 PrivateAttr,
50 RootModel,
51 TypeAdapter,
52 model_serializer,
53 model_validator,
54)
55from pydantic_core import core_schema
56from tqdm import tqdm
57from typing_extensions import (
58 Annotated,
59 LiteralString,
60 NotRequired,
61 Self,
62 TypeGuard,
63 Unpack,
64 assert_never,
65)
66from typing_extensions import TypeAliasType as _TypeAliasType
68from ._settings import settings
69from .io_basics import (
70 ALL_BIOIMAGEIO_YAML_NAMES,
71 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES,
72 BIOIMAGEIO_YAML,
73 AbsoluteDirectory,
74 AbsoluteFilePath,
75 BytesReader,
76 FileName,
77 FilePath,
78 Sha256,
79 ZipPath,
80 get_sha256,
81)
82from .node import Node
83from .progress import Progressbar
84from .root_url import RootHttpUrl
85from .type_guards import is_dict, is_list, is_mapping, is_sequence
86from .url import HttpUrl
87from .utils import SLOTS
88from .validation_context import get_validation_context
89from .version_type import Version
91AbsolutePathT = TypeVar(
92 "AbsolutePathT",
93 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath],
94)
97class LightHttpFileDescr(Node):
98 """http source with sha256 value (minimal validation)"""
100 source: pydantic.HttpUrl
101 """file source"""
103 sha256: Sha256
104 """SHA256 checksum of the source file"""
106 def get_reader(
107 self,
108 *,
109 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
110 ) -> BytesReader:
111 """open the file source (download if needed)"""
112 return get_reader(self.source, sha256=self.sha256, progressbar=progressbar)
114 download = get_reader
115 """alias for get_reader() method"""
118class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True):
119 _absolute: AbsolutePathT = PrivateAttr()
121 @property
122 def path(self) -> PurePath:
123 return self.root
125 def absolute( # method not property analog to `pathlib.Path.absolute()`
126 self,
127 ) -> AbsolutePathT:
128 """get the absolute path/url
130 (resolved at time of initialization with the root of the ValidationContext)
131 """
132 return self._absolute
134 def model_post_init(self, __context: Any) -> None:
135 """set `_absolute` property with validation context at creation time. @private"""
136 if self.root.is_absolute():
137 raise ValueError(f"{self.root} is an absolute path.")
139 if self.root.parts and self.root.parts[0] in ("http:", "https:"):
140 raise ValueError(f"{self.root} looks like an http url.")
142 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue]
143 self.get_absolute(get_validation_context().root)
144 )
145 super().model_post_init(__context)
147 def __str__(self) -> str:
148 return self.root.as_posix()
150 def __repr__(self) -> str:
151 return f"RelativePath('{self}')"
153 @model_serializer()
154 def format(self) -> str:
155 return str(self)
157 @abstractmethod
158 def get_absolute(
159 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
160 ) -> AbsolutePathT: ...
162 def _get_absolute_impl(
163 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
164 ) -> Union[Path, HttpUrl, ZipPath]:
165 if isinstance(root, Path):
166 return (root / self.root).absolute()
168 rel_path = self.root.as_posix().strip("/")
169 if isinstance(root, ZipFile):
170 return ZipPath(root, rel_path)
172 parsed = urlsplit(str(root))
173 path = list(parsed.path.strip("/").split("/"))
174 if (
175 parsed.netloc == "zenodo.org"
176 and parsed.path.startswith("/api/records/")
177 and parsed.path.endswith("/content")
178 ):
179 path.insert(-1, rel_path)
180 else:
181 path.append(rel_path)
183 return HttpUrl(
184 urlunsplit(
185 (
186 parsed.scheme,
187 parsed.netloc,
188 "/".join(path),
189 parsed.query,
190 parsed.fragment,
191 )
192 )
193 )
195 @classmethod
196 def _validate(cls, value: Union[PurePath, str]):
197 if isinstance(value, str) and (
198 value.startswith("https://") or value.startswith("http://")
199 ):
200 raise ValueError(f"{value} looks like a URL, not a relative path")
202 return cls(PurePath(value))
205class RelativeFilePath(
206 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True
207):
208 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL)."""
210 def model_post_init(self, __context: Any) -> None:
211 """add validation @private"""
212 if not self.root.parts: # an empty path can only be a directory
213 raise ValueError(f"{self.root} is not a valid file path.")
215 super().model_post_init(__context)
217 def get_absolute(
218 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
219 ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
220 absolute = self._get_absolute_impl(root)
221 if (
222 isinstance(absolute, Path)
223 and (context := get_validation_context()).perform_io_checks
224 and str(self.root) not in context.known_files
225 and not absolute.is_file()
226 ):
227 raise ValueError(f"{absolute} does not point to an existing file")
229 return absolute
231 @property
232 def suffix(self):
233 return self.root.suffix
236class RelativeDirectory(
237 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True
238):
239 def get_absolute(
240 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
241 ) -> "AbsoluteDirectory | HttpUrl | ZipPath":
242 absolute = self._get_absolute_impl(root)
243 if (
244 isinstance(absolute, Path)
245 and get_validation_context().perform_io_checks
246 and not absolute.is_dir()
247 ):
248 raise ValueError(f"{absolute} does not point to an existing directory")
250 return absolute
253FileSource = Annotated[
254 Union[HttpUrl, RelativeFilePath, FilePath],
255 Field(union_mode="left_to_right"),
256]
257PermissiveFileSource = Union[FileSource, str, pydantic.HttpUrl]
260class FileDescr(Node):
261 """A file description"""
263 source: FileSource
264 """File source"""
266 sha256: Optional[Sha256] = None
267 """SHA256 hash value of the **source** file."""
269 @model_validator(mode="after")
270 def _validate_sha256(self) -> Self:
271 self.validate_sha256()
272 return self
274 def validate_sha256(self, force_recompute: bool = False) -> None:
275 """validate the sha256 hash value of the **source** file"""
276 context = get_validation_context()
277 src_str = str(self.source)
278 if force_recompute:
279 actual_sha = None
280 else:
281 actual_sha = context.known_files.get(src_str)
283 if actual_sha is None:
284 if context.perform_io_checks or force_recompute:
285 reader = get_reader(self.source, sha256=self.sha256)
286 if force_recompute:
287 actual_sha = get_sha256(reader)
288 else:
289 actual_sha = reader.sha256
291 context.known_files[src_str] = actual_sha
292 elif context.known_files and src_str not in context.known_files:
293 # perform_io_checks is False, but known files were given,
294 # so we expect all file references to be in there
295 raise ValueError(f"File {src_str} not found in `known_files`.")
297 if actual_sha is None or self.sha256 == actual_sha:
298 return
299 elif self.sha256 is None or context.update_hashes:
300 self.sha256 = actual_sha
301 elif self.sha256 != actual_sha:
302 raise ValueError(
303 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
304 + f"{actual_sha}. Update expected `sha256` or point to the matching "
305 + "file."
306 )
308 def get_reader(
309 self,
310 *,
311 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
312 ):
313 """open the file source (download if needed)"""
314 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
316 def download(
317 self,
318 *,
319 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
320 ):
321 """alias for `.get_reader`"""
322 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
324 @property
325 def suffix(self) -> str:
326 return self.source.suffix
329path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = (
330 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl])
331)
334@dataclass(frozen=True, **SLOTS)
335class WithSuffix:
336 suffix: Union[LiteralString, Tuple[LiteralString, ...]]
337 case_sensitive: bool
339 def __get_pydantic_core_schema__(
340 self, source: Type[Any], handler: GetCoreSchemaHandler
341 ):
342 if not self.suffix:
343 raise ValueError("suffix may not be empty")
345 schema = handler(source)
346 return core_schema.no_info_after_validator_function(
347 self.validate,
348 schema,
349 )
351 def validate(
352 self, value: Union[FileSource, FileDescr]
353 ) -> Union[FileSource, FileDescr]:
354 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive)
357def wo_special_file_name(src: F) -> F:
358 if has_valid_bioimageio_yaml_name(src):
359 raise ValueError(
360 f"'{src}' not allowed here as its filename is reserved to identify"
361 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
362 )
364 return src
367def has_valid_bioimageio_yaml_name(src: Union[FileSource, FileDescr]) -> bool:
368 return is_valid_bioimageio_yaml_name(extract_file_name(src))
371def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool:
372 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
373 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name):
374 return True
376 return False
379def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName:
380 file_names = sorted(file_names)
381 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
382 for file_name in file_names:
383 if file_name == bioimageio_name or file_name.endswith(
384 "." + bioimageio_name
385 ):
386 return file_name
388 raise ValueError(
389 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'"
390 + " or or any of the alterntive file names:"
391 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of"
392 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')."
393 )
396def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName:
397 if isinstance(path, ZipFile):
398 file_names = path.namelist()
399 elif path.is_file():
400 if not zipfile.is_zipfile(path):
401 return path.name
403 with ZipFile(path, "r") as f:
404 file_names = f.namelist()
405 else:
406 file_names = [p.name for p in path.glob("*")]
408 return identify_bioimageio_yaml_file_name(
409 file_names
410 ) # TODO: try/except with better error message for dir
413def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource:
414 if not has_valid_bioimageio_yaml_name(src):
415 raise ValueError(
416 f"'{src}' does not have a valid filename to identify"
417 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
418 )
420 return src
423def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName:
424 if not is_valid_bioimageio_yaml_name(file_name):
425 raise ValueError(
426 f"'{file_name}' is not a valid filename to identify"
427 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
428 )
430 return file_name
433# types as loaded from YAML 1.2 (with ruyaml)
434YamlLeafValue = Union[
435 bool, _date, _datetime, int, float, str, None
436] # note: order relevant for deserializing
437YamlKey = Union[ # YAML Arrays are cast to tuples if used as key in mappings
438 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though)
439]
440if TYPE_CHECKING:
441 YamlValue = Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]]
442 YamlValueView = Union[
443 YamlLeafValue, Sequence["YamlValueView"], Mapping[YamlKey, "YamlValueView"]
444 ]
445else:
446 # for pydantic validation we need to use `TypeAliasType`,
447 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types
448 # however this results in a partially unknown type with the current pyright 1.1.388
449 YamlValue = _TypeAliasType(
450 "YamlValue",
451 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]],
452 )
453 YamlValueView = _TypeAliasType(
454 "YamlValueView",
455 Union[
456 YamlLeafValue,
457 Sequence["YamlValueView"],
458 Mapping[YamlKey, "YamlValueView"],
459 ],
460 )
463BioimageioYamlContent = Dict[str, YamlValue]
464BioimageioYamlContentView = Mapping[str, YamlValueView]
466IncompleteDescrLeaf = Union[Node, YamlValue, PermissiveFileSource, Version]
467"""Leaf value of a partial description"""
469IncompleteDescrInner = Union[
470 IncompleteDescrLeaf,
471 List["IncompleteDescrInner"],
472 Dict[YamlKey, "IncompleteDescrInner"],
473]
474"""An inner node of an incomplete resource description --- YAML values and description nodes mixed."""
476IncompleteDescr = Dict[str, IncompleteDescrInner]
477"""An incomplete resource description --- YAML values and description nodes mixed."""
480IncompleteDescrLeafView = Union[Node, YamlValueView, PermissiveFileSource, Version]
481"""Non-editable leaf value of an incomplete description"""
483IncompleteDescrInnerView = Union[
484 IncompleteDescrLeafView,
485 Sequence["IncompleteDescrInnerView"],
486 Mapping[YamlKey, "IncompleteDescrInnerView"],
487 # Mapping[str, YamlValueView], # not sure why this is explicit Mapping is needed
488]
489"""A inner node of a non-editable incomplete resource description --- YAML value views and Node instances mixed."""
491IncompleteDescrView = Mapping[str, IncompleteDescrInnerView]
492"""A non-editable incomplete resource description --- YAML mappings and Node instances mixed."""
495BioimageioYamlSource = Union[
496 PermissiveFileSource, ZipFile, BioimageioYamlContent, BioimageioYamlContentView
497]
500@overload
501def deepcopy_yaml_value(value: BioimageioYamlContentView) -> BioimageioYamlContent: ...
504@overload
505def deepcopy_yaml_value(value: YamlValueView) -> YamlValue: ...
508def deepcopy_yaml_value(
509 value: Union[BioimageioYamlContentView, YamlValueView],
510) -> Union[BioimageioYamlContent, YamlValue]:
511 if isinstance(value, collections.abc.Mapping):
512 return {key: deepcopy_yaml_value(val) for key, val in value.items()}
513 elif isinstance(value, collections.abc.Sequence):
514 return [deepcopy_yaml_value(val) for val in value]
515 else:
516 return value
519def deepcopy_incomplete_descr(data: IncompleteDescrView) -> IncompleteDescr:
520 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
523def _deepcopy_incomplete_descr_impl(
524 data: IncompleteDescrInnerView,
525) -> IncompleteDescrInner:
526 if isinstance(data, Node):
527 return deepcopy(data)
528 elif isinstance(data, str):
529 return data
530 elif isinstance(data, collections.abc.Mapping):
531 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
532 elif isinstance(data, collections.abc.Sequence):
533 return [_deepcopy_incomplete_descr_impl(v) for v in data]
534 elif isinstance(
535 data,
536 (
537 bool,
538 int,
539 float,
540 type(None),
541 _date,
542 _datetime,
543 Version,
544 RelativeFilePath,
545 PurePath,
546 HttpUrl,
547 pydantic.HttpUrl,
548 ),
549 ):
550 return data
551 else:
552 assert_never(data)
555def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]:
556 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None)))
559def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]:
560 return is_list(value) and all(is_yaml_value(item) for item in value)
563def is_yaml_sequence(value: Any) -> TypeGuard[List[YamlValueView]]:
564 return is_sequence(value) and all(is_yaml_value(item) for item in value)
567def is_yaml_dict(value: Any) -> TypeGuard[BioimageioYamlContent]:
568 return is_dict(value) and all(
569 isinstance(key, str) and is_yaml_value(val) for key, val in value.items()
570 )
573def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContentView]:
574 return is_mapping(value) and all(
575 isinstance(key, str) and is_yaml_value_read_only(val)
576 for key, val in value.items()
577 )
580def is_yaml_value(value: Any) -> TypeGuard[YamlValue]:
581 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_dict(value)
584def is_yaml_value_read_only(value: Any) -> TypeGuard[YamlValueView]:
585 return (
586 is_yaml_leaf_value(value) or is_yaml_sequence(value) or is_yaml_mapping(value)
587 )
590@dataclass(frozen=True, **SLOTS)
591class OpenedBioimageioYaml:
592 content: BioimageioYamlContent = field(repr=False)
593 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
594 original_source_name: Optional[str]
595 original_file_name: FileName
596 unparsed_content: str = field(repr=False)
599@dataclass(frozen=True, **SLOTS)
600class LocalFile:
601 path: FilePath
602 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
603 original_file_name: FileName
606@dataclass(frozen=True, **SLOTS)
607class FileInZip:
608 path: ZipPath
609 original_root: Union[RootHttpUrl, ZipFile]
610 original_file_name: FileName
613class HashKwargs(TypedDict):
614 sha256: NotRequired[Optional[Sha256]]
617_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = (
618 TypeAdapter(FileSource)
619)
622def interprete_file_source(file_source: PermissiveFileSource) -> FileSource:
623 if isinstance(file_source, Path):
624 if file_source.is_dir():
625 raise FileNotFoundError(
626 f"{file_source} is a directory, but expected a file."
627 )
628 return file_source
630 if isinstance(file_source, HttpUrl):
631 return file_source
633 if isinstance(file_source, pydantic.AnyUrl):
634 file_source = str(file_source)
636 with get_validation_context().replace(perform_io_checks=False):
637 strict = _file_source_adapter.validate_python(file_source)
638 if isinstance(strict, Path) and strict.is_dir():
639 raise FileNotFoundError(f"{strict} is a directory, but expected a file.")
641 return strict
644def extract(
645 source: Union[FilePath, ZipFile, ZipPath],
646 folder: Optional[DirectoryPath] = None,
647 overwrite: bool = False,
648) -> DirectoryPath:
649 extract_member = None
650 if isinstance(source, ZipPath):
651 extract_member = source.at
652 source = source.root
654 if isinstance(source, ZipFile):
655 zip_context = nullcontext(source)
656 if folder is None:
657 if source.filename is None:
658 folder = Path(mkdtemp())
659 else:
660 zip_path = Path(source.filename)
661 folder = zip_path.with_suffix(zip_path.suffix + ".unzip")
662 else:
663 zip_context = ZipFile(source, "r")
664 if folder is None:
665 folder = source.with_suffix(source.suffix + ".unzip")
667 if overwrite and folder.exists():
668 warnings.warn(f"Overwriting existing unzipped archive at {folder}")
670 with zip_context as f:
671 if extract_member is not None:
672 extracted_file_path = folder / extract_member
673 if extracted_file_path.exists() and not overwrite:
674 warnings.warn(f"Found unzipped {extracted_file_path}.")
675 else:
676 _ = f.extract(extract_member, folder)
678 return folder
680 elif overwrite or not folder.exists():
681 f.extractall(folder)
682 return folder
684 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")}
685 expected_content = {info.filename for info in f.filelist}
686 if expected_missing := expected_content - found_content:
687 parts = folder.name.split("_")
688 nr, *suffixes = parts[-1].split(".")
689 if nr.isdecimal():
690 nr = str(int(nr) + 1)
691 else:
692 nr = f"1.{nr}"
694 parts[-1] = ".".join([nr, *suffixes])
695 out_path_new = folder.with_name("_".join(parts))
696 warnings.warn(
697 f"Unzipped archive at {folder} is missing expected files"
698 + f" {expected_missing}."
699 + f" Unzipping to {out_path_new} instead to avoid overwriting."
700 )
701 return extract(f, out_path_new, overwrite=overwrite)
702 else:
703 warnings.warn(
704 f"Found unzipped archive with all expected files at {folder}."
705 )
706 return folder
709def get_reader(
710 source: Union[PermissiveFileSource, FileDescr, ZipPath],
711 /,
712 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
713 **kwargs: Unpack[HashKwargs],
714) -> BytesReader:
715 """Open a file `source` (download if needed)"""
716 if isinstance(source, FileDescr):
717 if "sha256" not in kwargs:
718 kwargs["sha256"] = source.sha256
720 source = source.source
721 elif isinstance(source, str):
722 source = interprete_file_source(source)
724 if isinstance(source, RelativeFilePath):
725 source = source.absolute()
726 elif isinstance(source, pydantic.AnyUrl):
727 with get_validation_context().replace(perform_io_checks=False):
728 source = HttpUrl(source)
730 if isinstance(source, HttpUrl):
731 return _open_url(source, progressbar=progressbar, **kwargs)
733 if isinstance(source, ZipPath):
734 if not source.exists():
735 raise FileNotFoundError(source.filename)
737 f = source.open(mode="rb")
738 assert not isinstance(f, TextIOWrapper)
739 root = source.root
740 elif isinstance(source, Path):
741 if source.is_dir():
742 raise FileNotFoundError(f"{source} is a directory, not a file")
744 if not source.exists():
745 raise FileNotFoundError(source)
747 f = source.open("rb")
748 root = source.parent
749 else:
750 assert_never(source)
752 expected_sha = kwargs.get("sha256")
753 if expected_sha is None:
754 sha = None
755 else:
756 sha = get_sha256(f)
757 _ = f.seek(0)
758 if sha != expected_sha:
759 raise ValueError(
760 f"SHA256 mismatch for {source}. Expected {expected_sha}, got {sha}."
761 )
763 return BytesReader(
764 f,
765 sha256=sha,
766 suffix=source.suffix,
767 original_file_name=source.name,
768 original_root=root,
769 is_zipfile=None,
770 )
773download = get_reader
776def _open_url(
777 source: HttpUrl,
778 /,
779 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None],
780 **kwargs: Unpack[HashKwargs],
781) -> BytesReader:
782 cache = (
783 NoopCache[RootHttpUrl](url_hasher=UrlDigest.from_str)
784 if get_validation_context().disable_cache
785 else settings.disk_cache
786 )
787 sha = kwargs.get("sha256")
788 force_refetch = True if sha is None else ContentDigest.parse(hexdigest=sha)
789 source_path = PurePosixPath(
790 source.path
791 or sha
792 or hashlib.sha256(str(source).encode(encoding="utf-8")).hexdigest()
793 )
795 reader = cache.fetch(
796 source,
797 fetcher=partial(_fetch_url, progressbar=progressbar),
798 force_refetch=force_refetch,
799 )
800 return BytesReader(
801 reader,
802 suffix=source_path.suffix,
803 sha256=sha,
804 original_file_name=source_path.name,
805 original_root=source.parent,
806 is_zipfile=None,
807 )
810def _fetch_url(
811 source: RootHttpUrl,
812 *,
813 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None],
814):
815 if source.scheme not in ("http", "https"):
816 raise NotImplementedError(source.scheme)
818 if progressbar is None:
819 # chose progressbar option from validation context
820 progressbar = get_validation_context().progressbar
822 if progressbar is None:
823 # default to no progressbar in CI environments
824 progressbar = not settings.CI
826 if callable(progressbar):
827 progressbar = progressbar()
829 if isinstance(progressbar, bool) and progressbar:
830 progressbar = tqdm(
831 ncols=79,
832 ascii=bool(sys.platform == "win32"),
833 unit="B",
834 unit_scale=True,
835 leave=True,
836 )
838 if progressbar is not False:
839 progressbar.set_description(f"Downloading {extract_file_name(source)}")
841 headers: Dict[str, str] = {}
842 if settings.user_agent is not None:
843 headers["User-Agent"] = settings.user_agent
844 elif settings.CI:
845 headers["User-Agent"] = "ci"
847 r = httpx.get(
848 str(source),
849 follow_redirects=True,
850 headers=headers,
851 timeout=settings.http_timeout,
852 )
853 _ = r.raise_for_status()
855 # set progressbar.total
856 total = r.headers.get("content-length")
857 if total is not None and not isinstance(total, int):
858 try:
859 total = int(total)
860 except Exception:
861 total = None
863 if progressbar is not False:
864 if total is None:
865 progressbar.total = 0
866 else:
867 progressbar.total = total
869 def iter_content():
870 for chunk in r.iter_bytes(chunk_size=4096):
871 yield chunk
872 if progressbar is not False:
873 _ = progressbar.update(len(chunk))
875 # Make sure the progress bar gets filled even if the actual number
876 # is chunks is smaller than expected. This happens when streaming
877 # text files that are compressed by the server when sending (gzip).
878 # Binary files don't experience this.
879 # (adapted from pooch.HttpDownloader)
880 if progressbar is not False:
881 progressbar.reset()
882 if total is not None:
883 _ = progressbar.update(total)
885 progressbar.close()
887 return iter_content()
890def extract_file_name(
891 src: Union[
892 pydantic.HttpUrl, RootHttpUrl, PurePath, RelativeFilePath, ZipPath, FileDescr
893 ],
894) -> FileName:
895 if isinstance(src, FileDescr):
896 src = src.source
898 if isinstance(src, ZipPath):
899 return src.name or src.root.filename or "bioimageio.zip"
900 elif isinstance(src, RelativeFilePath):
901 return src.path.name
902 elif isinstance(src, PurePath):
903 return src.name
904 else:
905 url = urlparse(str(src))
906 if (
907 url.scheme == "https"
908 and url.hostname == "zenodo.org"
909 and url.path.startswith("/api/records/")
910 and url.path.endswith("/content")
911 ):
912 return url.path.split("/")[-2]
913 else:
914 return url.path.split("/")[-1]
917def extract_file_descrs(
918 data: IncompleteDescrView,
919) -> List[FileDescr]:
920 collected: List[FileDescr] = []
921 with get_validation_context().replace(perform_io_checks=False, log_warnings=False):
922 _extract_file_descrs_impl(data, collected)
924 return collected
927def _extract_file_descrs_impl(
928 data: Union[IncompleteDescrView, IncompleteDescrInnerView],
929 collected: List[FileDescr],
930) -> None:
931 if isinstance(data, FileDescr):
932 collected.append(data)
933 elif isinstance(data, Node):
934 for _, v in data:
935 _extract_file_descrs_impl(v, collected)
936 elif isinstance(data, collections.abc.Mapping):
937 if "source" in data and "sha256" in data:
938 try:
939 fd = FileDescr.model_validate(
940 dict(source=data["source"], sha256=data["sha256"])
941 )
942 except Exception:
943 warnings.warn(
944 "Found mapping with 'source' and 'sha256' keys, but could not parse it as a FileDescr. Ignoring `sha256`."
945 )
946 try:
947 fd = FileDescr.model_validate(dict(source=data["source"]))
948 except Exception:
949 warnings.warn(
950 f"Found mapping with 'source' and `sha256' keys , but could not parse it as a FileDescr, evning when ignoring 'sha256'. Ignoring `source`: {data['source']}."
951 )
952 else:
953 collected.append(fd)
954 else:
955 collected.append(fd)
957 for v in data.values():
958 _extract_file_descrs_impl(v, collected)
959 elif not isinstance(data, (str, Path, RelativeFilePath)) and isinstance(
960 data, collections.abc.Sequence
961 ):
962 for v in data:
963 _extract_file_descrs_impl(v, collected)
966F = TypeVar("F", bound=Union[FileSource, FileDescr])
969def validate_suffix(
970 value: F, suffix: Union[str, Sequence[str]], case_sensitive: bool
971) -> F:
972 """check final suffix"""
973 if isinstance(suffix, str):
974 suffixes = [suffix]
975 else:
976 suffixes = suffix
978 assert len(suffixes) > 0, "no suffix given"
979 assert all(suff.startswith(".") for suff in suffixes), (
980 "expected suffixes to start with '.'"
981 )
982 o_value = value
983 if isinstance(value, FileDescr):
984 strict = value.source
985 else:
986 strict = interprete_file_source(value)
988 if isinstance(strict, (HttpUrl, AnyUrl)):
989 if strict.path is None or "." not in (path := strict.path):
990 actual_suffixes = []
991 else:
992 if (
993 strict.host == "zenodo.org"
994 and path.startswith("/api/records/")
995 and path.endswith("/content")
996 ):
997 # Zenodo API URLs have a "/content" suffix that should be ignored
998 path = path[: -len("/content")]
1000 actual_suffixes = [f".{path.split('.')[-1]}"]
1002 elif isinstance(strict, PurePath):
1003 actual_suffixes = strict.suffixes
1004 elif isinstance(strict, RelativeFilePath):
1005 actual_suffixes = strict.path.suffixes
1006 else:
1007 assert_never(strict)
1009 if actual_suffixes:
1010 actual_suffix = actual_suffixes[-1]
1011 else:
1012 actual_suffix = "no suffix"
1014 if (
1015 case_sensitive
1016 and actual_suffix not in suffixes
1017 or not case_sensitive
1018 and actual_suffix.lower() not in [s.lower() for s in suffixes]
1019 ):
1020 if len(suffixes) == 1:
1021 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}")
1022 else:
1023 raise ValueError(
1024 f"Expected a suffix from {suffixes}, but got {actual_suffix}"
1025 )
1027 return o_value
1030def populate_cache(sources: Sequence[Union[FileDescr, LightHttpFileDescr]]):
1031 unique: Set[str] = set()
1032 for src in sources:
1033 if src.sha256 is None:
1034 continue # not caching without known SHA
1036 if isinstance(src.source, (HttpUrl, pydantic.AnyUrl)):
1037 url = str(src.source)
1038 elif isinstance(src.source, RelativeFilePath):
1039 if isinstance(absolute := src.source.absolute(), HttpUrl):
1040 url = str(absolute)
1041 else:
1042 continue # not caching local paths
1043 elif isinstance(src.source, Path):
1044 continue # not caching local paths
1045 else:
1046 assert_never(src.source)
1048 if url in unique:
1049 continue # skip duplicate URLs
1051 unique.add(url)
1052 _ = src.download()