Coverage for src / bioimageio / spec / _internal / io.py: 78%
490 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 13:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 13:09 +0000
1from __future__ import annotations
3import collections.abc
4import hashlib
5import sys
6import warnings
7import zipfile
8from abc import abstractmethod
9from contextlib import nullcontext
10from copy import deepcopy
11from dataclasses import dataclass, field
12from datetime import date as _date
13from datetime import datetime as _datetime
14from functools import partial
15from io import TextIOWrapper
16from pathlib import Path, PurePath, PurePosixPath
17from tempfile import mkdtemp
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Callable,
22 Dict,
23 Generic,
24 Iterable,
25 List,
26 Mapping,
27 Optional,
28 Sequence,
29 Set,
30 Tuple,
31 Type,
32 TypedDict,
33 TypeVar,
34 Union,
35 overload,
36)
37from urllib.parse import urlparse, urlsplit, urlunsplit
38from zipfile import ZipFile
40import httpx
41import pydantic
42from genericache import NoopCache
43from genericache.digest import ContentDigest, UrlDigest
44from pydantic import (
45 AnyUrl,
46 DirectoryPath,
47 Field,
48 GetCoreSchemaHandler,
49 PrivateAttr,
50 RootModel,
51 TypeAdapter,
52 model_serializer,
53 model_validator,
54)
55from pydantic_core import core_schema
56from tqdm import tqdm
57from typing_extensions import (
58 Annotated,
59 LiteralString,
60 NotRequired,
61 Self,
62 TypeGuard,
63 Unpack,
64 assert_never,
65)
66from typing_extensions import TypeAliasType as _TypeAliasType
68from ._settings import settings
69from .io_basics import (
70 ALL_BIOIMAGEIO_YAML_NAMES,
71 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES,
72 BIOIMAGEIO_YAML,
73 AbsoluteDirectory,
74 AbsoluteFilePath,
75 BytesReader,
76 FileName,
77 FilePath,
78 Sha256,
79 ZipPath,
80 get_sha256,
81)
82from .node import Node
83from .progress import Progressbar
84from .root_url import RootHttpUrl
85from .type_guards import is_dict, is_list, is_mapping, is_sequence
86from .url import HttpUrl
87from .utils import SLOTS
88from .validation_context import get_validation_context
89from .version_type import Version
91AbsolutePathT = TypeVar(
92 "AbsolutePathT",
93 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath],
94)
97class LightHttpFileDescr(Node):
98 """http source with sha256 value (minimal validation)"""
100 source: pydantic.HttpUrl
101 """file source"""
103 sha256: Sha256
104 """SHA256 checksum of the source file"""
106 def get_reader(
107 self,
108 *,
109 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
110 ) -> BytesReader:
111 """open the file source (download if needed)"""
112 return get_reader(self.source, sha256=self.sha256, progressbar=progressbar)
114 download = get_reader
115 """alias for get_reader() method"""
118class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True):
119 _absolute: AbsolutePathT = PrivateAttr()
121 @property
122 def path(self) -> PurePath:
123 return self.root
125 def absolute( # method not property analog to `pathlib.Path.absolute()`
126 self,
127 ) -> AbsolutePathT:
128 """get the absolute path/url
130 (resolved at time of initialization with the root of the ValidationContext)
131 """
132 return self._absolute
134 def model_post_init(self, __context: Any) -> None:
135 """set `_absolute` property with validation context at creation time. @private"""
136 if self.root.is_absolute():
137 raise ValueError(f"{self.root} is an absolute path.")
139 if self.root.parts and self.root.parts[0] in ("http:", "https:"):
140 raise ValueError(f"{self.root} looks like an http url.")
142 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue]
143 self.get_absolute(get_validation_context().root)
144 )
145 super().model_post_init(__context)
147 def __str__(self) -> str:
148 return self.root.as_posix()
150 def __repr__(self) -> str:
151 return f"RelativePath('{self}')"
153 @model_serializer()
154 def format(self) -> str:
155 return str(self)
157 @abstractmethod
158 def get_absolute(
159 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
160 ) -> AbsolutePathT: ...
162 def _get_absolute_impl(
163 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
164 ) -> Union[Path, HttpUrl, ZipPath]:
165 if isinstance(root, Path):
166 return (root / self.root).absolute()
168 rel_path = self.root.as_posix().strip("/")
169 if isinstance(root, ZipFile):
170 return ZipPath(root, rel_path)
172 parsed = urlsplit(str(root))
173 path = list(parsed.path.strip("/").split("/"))
174 if (
175 parsed.netloc == "zenodo.org"
176 and parsed.path.startswith("/api/records/")
177 and parsed.path.endswith("/content")
178 ):
179 path.insert(-1, rel_path)
180 else:
181 path.append(rel_path)
183 return HttpUrl(
184 urlunsplit(
185 (
186 parsed.scheme,
187 parsed.netloc,
188 "/".join(path),
189 parsed.query,
190 parsed.fragment,
191 )
192 )
193 )
195 @classmethod
196 def _validate(cls, value: Union[PurePath, str]):
197 if isinstance(value, str) and (
198 value.startswith("https://") or value.startswith("http://")
199 ):
200 raise ValueError(f"{value} looks like a URL, not a relative path")
202 return cls(PurePath(value))
205class RelativeFilePath(
206 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True
207):
208 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL)."""
210 def model_post_init(self, __context: Any) -> None:
211 """add validation @private"""
212 if not self.root.parts: # an empty path can only be a directory
213 raise ValueError(f"{self.root} is not a valid file path.")
215 super().model_post_init(__context)
217 def get_absolute(
218 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
219 ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
220 absolute = self._get_absolute_impl(root)
221 if (
222 isinstance(absolute, Path)
223 and (context := get_validation_context()).perform_io_checks
224 and str(self.root) not in context.known_files
225 and not absolute.is_file()
226 ):
227 raise ValueError(f"{absolute} does not point to an existing file")
229 return absolute
231 @property
232 def suffix(self):
233 return self.root.suffix
236class RelativeDirectory(
237 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True
238):
239 def get_absolute(
240 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
241 ) -> "AbsoluteDirectory | HttpUrl | ZipPath":
242 absolute = self._get_absolute_impl(root)
243 if (
244 isinstance(absolute, Path)
245 and get_validation_context().perform_io_checks
246 and not absolute.is_dir()
247 ):
248 raise ValueError(f"{absolute} does not point to an existing directory")
250 return absolute
253FileSource = Annotated[
254 Union[HttpUrl, RelativeFilePath, FilePath],
255 Field(union_mode="left_to_right"),
256]
257PermissiveFileSource = Union[FileSource, str, pydantic.HttpUrl]
260class FileDescr(Node):
261 """A file description"""
263 source: FileSource
264 """File source"""
266 sha256: Optional[Sha256] = None
267 """SHA256 hash value of the **source** file."""
269 @model_validator(mode="after")
270 def _validate_sha256(self) -> Self:
271 self.validate_sha256()
272 return self
274 def validate_sha256(self, force_recompute: bool = False) -> None:
275 """validate the sha256 hash value of the **source** file"""
276 context = get_validation_context()
277 src_str = str(self.source)
278 if force_recompute:
279 actual_sha = None
280 else:
281 actual_sha = context.known_files.get(src_str)
283 if actual_sha is None:
284 if context.perform_io_checks or force_recompute:
285 reader = get_reader(self.source, sha256=self.sha256)
286 if force_recompute:
287 actual_sha = get_sha256(reader)
288 else:
289 actual_sha = reader.sha256
291 context.known_files[src_str] = actual_sha
292 elif context.known_files and src_str not in context.known_files:
293 # perform_io_checks is False, but known files were given,
294 # so we expect all file references to be in there
295 raise ValueError(f"File {src_str} not found in `known_files`.")
297 if actual_sha is None or self.sha256 == actual_sha:
298 return
299 elif self.sha256 is None or context.update_hashes:
300 self.sha256 = actual_sha
301 elif self.sha256 != actual_sha:
302 raise ValueError(
303 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
304 + f"{actual_sha}. Update expected `sha256` or point to the matching "
305 + "file."
306 )
308 def get_reader(
309 self,
310 *,
311 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
312 ):
313 """open the file source (download if needed)"""
314 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
316 def download(
317 self,
318 *,
319 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
320 ):
321 """alias for `.get_reader`"""
322 return get_reader(self.source, progressbar=progressbar, sha256=self.sha256)
324 @property
325 def suffix(self) -> str:
326 return self.source.suffix
329path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = (
330 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl])
331)
334@dataclass(frozen=True, **SLOTS)
335class WithSuffix:
336 suffix: Union[LiteralString, Tuple[LiteralString, ...]]
337 case_sensitive: bool
339 def __get_pydantic_core_schema__(
340 self, source: Type[Any], handler: GetCoreSchemaHandler
341 ):
342 if not self.suffix:
343 raise ValueError("suffix may not be empty")
345 schema = handler(source)
346 return core_schema.no_info_after_validator_function(
347 self.validate,
348 schema,
349 )
351 def validate(
352 self, value: Union[FileSource, FileDescr]
353 ) -> Union[FileSource, FileDescr]:
354 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive)
357def wo_special_file_name(src: F) -> F:
358 if has_valid_bioimageio_yaml_name(src):
359 raise ValueError(
360 f"'{src}' not allowed here as its filename is reserved to identify"
361 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
362 )
364 return src
367def has_valid_bioimageio_yaml_name(src: Union[FileSource, FileDescr]) -> bool:
368 return is_valid_bioimageio_yaml_name(extract_file_name(src))
371def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool:
372 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
373 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name):
374 return True
376 return False
379def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName:
380 file_names = sorted(file_names)
381 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
382 for file_name in file_names:
383 if file_name == bioimageio_name or file_name.endswith(
384 "." + bioimageio_name
385 ):
386 return file_name
388 raise ValueError(
389 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'"
390 + " or or any of the alterntive file names:"
391 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of"
392 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')."
393 )
396def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName:
397 if isinstance(path, ZipFile):
398 file_names = path.namelist()
399 elif path.is_file():
400 if not zipfile.is_zipfile(path):
401 return path.name
403 with ZipFile(path, "r") as f:
404 file_names = f.namelist()
405 else:
406 file_names = [p.name for p in path.glob("*")]
408 return identify_bioimageio_yaml_file_name(file_names)
411def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource:
412 if not has_valid_bioimageio_yaml_name(src):
413 raise ValueError(
414 f"'{src}' does not have a valid filename to identify"
415 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
416 )
418 return src
421def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName:
422 if not is_valid_bioimageio_yaml_name(file_name):
423 raise ValueError(
424 f"'{file_name}' is not a valid filename to identify"
425 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
426 )
428 return file_name
431# types as loaded from YAML 1.2 (with ruyaml)
432YamlLeafValue = Union[
433 bool, _date, _datetime, int, float, str, None
434] # note: order relevant for deserializing
435YamlKey = Union[ # YAML Arrays are cast to tuples if used as key in mappings
436 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though)
437]
438if TYPE_CHECKING:
439 YamlValue = Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]]
440 YamlValueView = Union[
441 YamlLeafValue, Sequence["YamlValueView"], Mapping[YamlKey, "YamlValueView"]
442 ]
443else:
444 # for pydantic validation we need to use `TypeAliasType`,
445 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types
446 # however this results in a partially unknown type with the current pyright 1.1.388
447 YamlValue = _TypeAliasType(
448 "YamlValue",
449 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]],
450 )
451 YamlValueView = _TypeAliasType(
452 "YamlValueView",
453 Union[
454 YamlLeafValue,
455 Sequence["YamlValueView"],
456 Mapping[YamlKey, "YamlValueView"],
457 ],
458 )
461BioimageioYamlContent = Dict[str, YamlValue]
462BioimageioYamlContentView = Mapping[str, YamlValueView]
464IncompleteDescrLeaf = Union[Node, YamlValue, PermissiveFileSource, Version]
465"""Leaf value of a partial description"""
467IncompleteDescrInner = Union[
468 IncompleteDescrLeaf,
469 List["IncompleteDescrInner"],
470 Dict[YamlKey, "IncompleteDescrInner"],
471]
472"""An inner node of an incomplete resource description --- YAML values and description nodes mixed."""
474IncompleteDescr = Dict[str, IncompleteDescrInner]
475"""An incomplete resource description --- YAML values and description nodes mixed."""
478IncompleteDescrLeafView = Union[Node, YamlValueView, PermissiveFileSource, Version]
479"""Non-editable leaf value of an incomplete description"""
481IncompleteDescrInnerView = Union[
482 IncompleteDescrLeafView,
483 Sequence["IncompleteDescrInnerView"],
484 Mapping[YamlKey, "IncompleteDescrInnerView"],
485 # Mapping[str, YamlValueView], # not sure why this is explicit Mapping is needed
486]
487"""A inner node of a non-editable incomplete resource description --- YAML value views and Node instances mixed."""
489IncompleteDescrView = Mapping[str, IncompleteDescrInnerView]
490"""A non-editable incomplete resource description --- YAML mappings and Node instances mixed."""
493BioimageioYamlSource = Union[
494 PermissiveFileSource, ZipFile, BioimageioYamlContent, BioimageioYamlContentView
495]
498@overload
499def deepcopy_yaml_value(value: BioimageioYamlContentView) -> BioimageioYamlContent: ...
502@overload
503def deepcopy_yaml_value(value: YamlValueView) -> YamlValue: ...
506def deepcopy_yaml_value(
507 value: Union[BioimageioYamlContentView, YamlValueView],
508) -> Union[BioimageioYamlContent, YamlValue]:
509 if isinstance(value, collections.abc.Mapping):
510 return {key: deepcopy_yaml_value(val) for key, val in value.items()}
511 elif isinstance(value, collections.abc.Sequence):
512 return [deepcopy_yaml_value(val) for val in value]
513 else:
514 return value
517def deepcopy_incomplete_descr(data: IncompleteDescrView) -> IncompleteDescr:
518 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
521def _deepcopy_incomplete_descr_impl(
522 data: IncompleteDescrInnerView,
523) -> IncompleteDescrInner:
524 if isinstance(data, Node):
525 return deepcopy(data)
526 elif isinstance(data, str):
527 return data
528 elif isinstance(data, collections.abc.Mapping):
529 return {k: _deepcopy_incomplete_descr_impl(v) for k, v in data.items()}
530 elif isinstance(data, collections.abc.Sequence):
531 return [_deepcopy_incomplete_descr_impl(v) for v in data]
532 elif isinstance(
533 data,
534 (
535 bool,
536 int,
537 float,
538 type(None),
539 _date,
540 _datetime,
541 Version,
542 RelativeFilePath,
543 PurePath,
544 HttpUrl,
545 pydantic.HttpUrl,
546 ),
547 ):
548 return data
549 else:
550 assert_never(data)
553def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]:
554 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None)))
557def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]:
558 return is_list(value) and all(is_yaml_value(item) for item in value)
561def is_yaml_sequence(value: Any) -> TypeGuard[List[YamlValueView]]:
562 return is_sequence(value) and all(is_yaml_value(item) for item in value)
565def is_yaml_dict(value: Any) -> TypeGuard[BioimageioYamlContent]:
566 return is_dict(value) and all(
567 isinstance(key, str) and is_yaml_value(val) for key, val in value.items()
568 )
571def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContentView]:
572 return is_mapping(value) and all(
573 isinstance(key, str) and is_yaml_value_read_only(val)
574 for key, val in value.items()
575 )
578def is_yaml_value(value: Any) -> TypeGuard[YamlValue]:
579 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_dict(value)
582def is_yaml_value_read_only(value: Any) -> TypeGuard[YamlValueView]:
583 return (
584 is_yaml_leaf_value(value) or is_yaml_sequence(value) or is_yaml_mapping(value)
585 )
588@dataclass(frozen=True, **SLOTS)
589class OpenedBioimageioYaml:
590 content: BioimageioYamlContent = field(repr=False)
591 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
592 original_source_name: Optional[str]
593 original_file_name: FileName
594 unparsed_content: str = field(repr=False)
597@dataclass(frozen=True, **SLOTS)
598class LocalFile:
599 path: FilePath
600 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
601 original_file_name: FileName
604@dataclass(frozen=True, **SLOTS)
605class FileInZip:
606 path: ZipPath
607 original_root: Union[RootHttpUrl, ZipFile]
608 original_file_name: FileName
611class HashKwargs(TypedDict):
612 sha256: NotRequired[Optional[Sha256]]
615_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = (
616 TypeAdapter(FileSource)
617)
620def interprete_file_source(file_source: PermissiveFileSource) -> FileSource:
621 if isinstance(file_source, Path):
622 if file_source.is_dir():
623 raise FileNotFoundError(
624 f"{file_source} is a directory, but expected a file."
625 )
626 return file_source
628 if isinstance(file_source, HttpUrl):
629 return file_source
631 if isinstance(file_source, pydantic.AnyUrl):
632 file_source = str(file_source)
634 with get_validation_context().replace(perform_io_checks=False):
635 strict = _file_source_adapter.validate_python(file_source)
636 if isinstance(strict, Path) and strict.is_dir():
637 raise FileNotFoundError(f"{strict} is a directory, but expected a file.")
639 return strict
642def extract(
643 source: Union[FilePath, ZipFile, ZipPath],
644 folder: Optional[DirectoryPath] = None,
645 overwrite: bool = False,
646) -> DirectoryPath:
647 extract_member = None
648 if isinstance(source, ZipPath):
649 extract_member = source.at
650 source = source.root
652 if isinstance(source, ZipFile):
653 zip_context = nullcontext(source)
654 if folder is None:
655 if source.filename is None:
656 folder = Path(mkdtemp())
657 else:
658 zip_path = Path(source.filename)
659 folder = zip_path.with_suffix(zip_path.suffix + ".unzip")
660 else:
661 zip_context = ZipFile(source, "r")
662 if folder is None:
663 folder = source.with_suffix(source.suffix + ".unzip")
665 if overwrite and folder.exists():
666 warnings.warn(f"Overwriting existing unzipped archive at {folder}")
668 with zip_context as f:
669 if extract_member is not None:
670 extracted_file_path = folder / extract_member
671 if extracted_file_path.exists() and not overwrite:
672 warnings.warn(f"Found unzipped {extracted_file_path}.")
673 else:
674 _ = f.extract(extract_member, folder)
676 return folder
678 elif overwrite or not folder.exists():
679 f.extractall(folder)
680 return folder
682 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")}
683 expected_content = {info.filename for info in f.filelist}
684 if expected_missing := expected_content - found_content:
685 parts = folder.name.split("_")
686 nr, *suffixes = parts[-1].split(".")
687 if nr.isdecimal():
688 nr = str(int(nr) + 1)
689 else:
690 nr = f"1.{nr}"
692 parts[-1] = ".".join([nr, *suffixes])
693 out_path_new = folder.with_name("_".join(parts))
694 warnings.warn(
695 f"Unzipped archive at {folder} is missing expected files"
696 + f" {expected_missing}."
697 + f" Unzipping to {out_path_new} instead to avoid overwriting."
698 )
699 return extract(f, out_path_new, overwrite=overwrite)
700 else:
701 warnings.warn(
702 f"Found unzipped archive with all expected files at {folder}."
703 )
704 return folder
707def get_reader(
708 source: Union[PermissiveFileSource, FileDescr, ZipPath],
709 /,
710 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
711 **kwargs: Unpack[HashKwargs],
712) -> BytesReader:
713 """Open a file `source` (download if needed)"""
714 if isinstance(source, FileDescr):
715 if "sha256" not in kwargs:
716 kwargs["sha256"] = source.sha256
718 source = source.source
719 elif isinstance(source, str):
720 source = interprete_file_source(source)
722 if isinstance(source, RelativeFilePath):
723 source = source.absolute()
724 elif isinstance(source, pydantic.AnyUrl):
725 with get_validation_context().replace(perform_io_checks=False):
726 source = HttpUrl(source)
728 if isinstance(source, HttpUrl):
729 return _open_url(source, progressbar=progressbar, **kwargs)
731 if isinstance(source, ZipPath):
732 if not source.exists():
733 raise FileNotFoundError(source.filename)
735 f = source.open(mode="rb")
736 assert not isinstance(f, TextIOWrapper)
737 root = source.root
738 elif isinstance(source, Path):
739 if source.is_dir():
740 raise FileNotFoundError(f"{source} is a directory, not a file")
742 if not source.exists():
743 raise FileNotFoundError(source)
745 f = source.open("rb")
746 root = source.parent
747 else:
748 assert_never(source)
750 expected_sha = kwargs.get("sha256")
751 if expected_sha is None:
752 sha = None
753 else:
754 sha = get_sha256(f)
755 _ = f.seek(0)
756 if sha != expected_sha:
757 raise ValueError(
758 f"SHA256 mismatch for {source}. Expected {expected_sha}, got {sha}."
759 )
761 return BytesReader(
762 f,
763 sha256=sha,
764 suffix=source.suffix,
765 original_file_name=source.name,
766 original_root=root,
767 is_zipfile=None,
768 )
771download = get_reader
774def _open_url(
775 source: HttpUrl,
776 /,
777 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None],
778 **kwargs: Unpack[HashKwargs],
779) -> BytesReader:
780 cache = (
781 NoopCache[RootHttpUrl](url_hasher=UrlDigest.from_str)
782 if get_validation_context().disable_cache
783 else settings.disk_cache
784 )
785 sha = kwargs.get("sha256")
786 force_refetch = True if sha is None else ContentDigest.parse(hexdigest=sha)
787 source_path = PurePosixPath(
788 source.path
789 or sha
790 or hashlib.sha256(str(source).encode(encoding="utf-8")).hexdigest()
791 )
793 reader = cache.fetch(
794 source,
795 fetcher=partial(_fetch_url, progressbar=progressbar),
796 force_refetch=force_refetch,
797 )
798 return BytesReader(
799 reader,
800 suffix=source_path.suffix,
801 sha256=sha,
802 original_file_name=source_path.name,
803 original_root=source.parent,
804 is_zipfile=None,
805 )
808def _fetch_url(
809 source: RootHttpUrl,
810 *,
811 progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None],
812):
813 if source.scheme not in ("http", "https"):
814 raise NotImplementedError(source.scheme)
816 if progressbar is None:
817 # chose progressbar option from validation context
818 progressbar = get_validation_context().progressbar
820 if progressbar is None:
821 # default to no progressbar in CI environments
822 progressbar = not settings.CI
824 if callable(progressbar):
825 progressbar = progressbar()
827 if isinstance(progressbar, bool) and progressbar:
828 progressbar = tqdm(
829 ncols=79,
830 ascii=bool(sys.platform == "win32"),
831 unit="B",
832 unit_scale=True,
833 leave=True,
834 )
836 if progressbar is not False:
837 progressbar.set_description(f"Downloading {extract_file_name(source)}")
839 headers: Dict[str, str] = {}
840 if settings.user_agent is not None:
841 headers["User-Agent"] = settings.user_agent
842 elif settings.CI:
843 headers["User-Agent"] = "ci"
845 r = httpx.get(
846 str(source),
847 follow_redirects=True,
848 headers=headers,
849 timeout=settings.http_timeout,
850 )
851 _ = r.raise_for_status()
853 # set progressbar.total
854 total = r.headers.get("content-length")
855 if total is not None and not isinstance(total, int):
856 try:
857 total = int(total)
858 except Exception:
859 total = None
861 if progressbar is not False:
862 if total is None:
863 progressbar.total = 0
864 else:
865 progressbar.total = total
867 def iter_content():
868 for chunk in r.iter_bytes(chunk_size=4096):
869 yield chunk
870 if progressbar is not False:
871 _ = progressbar.update(len(chunk))
873 # Make sure the progress bar gets filled even if the actual number
874 # is chunks is smaller than expected. This happens when streaming
875 # text files that are compressed by the server when sending (gzip).
876 # Binary files don't experience this.
877 # (adapted from pooch.HttpDownloader)
878 if progressbar is not False:
879 progressbar.reset()
880 if total is not None:
881 _ = progressbar.update(total)
883 progressbar.close()
885 return iter_content()
888def extract_file_name(
889 src: Union[
890 pydantic.HttpUrl, RootHttpUrl, PurePath, RelativeFilePath, ZipPath, FileDescr
891 ],
892) -> FileName:
893 if isinstance(src, FileDescr):
894 src = src.source
896 if isinstance(src, ZipPath):
897 return src.name or src.root.filename or "bioimageio.zip"
898 elif isinstance(src, RelativeFilePath):
899 return src.path.name
900 elif isinstance(src, PurePath):
901 return src.name
902 else:
903 url = urlparse(str(src))
904 if (
905 url.scheme == "https"
906 and url.hostname == "zenodo.org"
907 and url.path.startswith("/api/records/")
908 and url.path.endswith("/content")
909 ):
910 return url.path.split("/")[-2]
911 else:
912 return url.path.split("/")[-1]
915def extract_file_descrs(
916 data: IncompleteDescrView,
917) -> List[FileDescr]:
918 collected: List[FileDescr] = []
919 with get_validation_context().replace(perform_io_checks=False, log_warnings=False):
920 _extract_file_descrs_impl(data, collected)
922 return collected
925def _extract_file_descrs_impl(
926 data: Union[IncompleteDescrView, IncompleteDescrInnerView],
927 collected: List[FileDescr],
928) -> None:
929 if isinstance(data, FileDescr):
930 collected.append(data)
931 elif isinstance(data, Node):
932 for _, v in data:
933 _extract_file_descrs_impl(v, collected)
934 elif isinstance(data, collections.abc.Mapping):
935 if "source" in data and "sha256" in data:
936 try:
937 fd = FileDescr.model_validate(
938 dict(source=data["source"], sha256=data["sha256"])
939 )
940 except Exception:
941 warnings.warn(
942 "Found mapping with 'source' and 'sha256' keys, but could not parse it as a FileDescr. Ignoring `sha256`."
943 )
944 try:
945 fd = FileDescr.model_validate(dict(source=data["source"]))
946 except Exception:
947 warnings.warn(
948 f"Found mapping with 'source' and `sha256' keys , but could not parse it as a FileDescr, evning when ignoring 'sha256'. Ignoring `source`: {data['source']}."
949 )
950 else:
951 collected.append(fd)
952 else:
953 collected.append(fd)
955 for v in data.values():
956 _extract_file_descrs_impl(v, collected)
957 elif not isinstance(data, (str, Path, RelativeFilePath)) and isinstance(
958 data, collections.abc.Sequence
959 ):
960 for v in data:
961 _extract_file_descrs_impl(v, collected)
964F = TypeVar("F", bound=Union[FileSource, FileDescr])
967def validate_suffix(
968 value: F, suffix: Union[str, Sequence[str]], case_sensitive: bool
969) -> F:
970 """check final suffix"""
971 if isinstance(suffix, str):
972 suffixes = [suffix]
973 else:
974 suffixes = suffix
976 assert len(suffixes) > 0, "no suffix given"
977 assert all(suff.startswith(".") for suff in suffixes), (
978 "expected suffixes to start with '.'"
979 )
980 o_value = value
981 if isinstance(value, FileDescr):
982 strict = value.source
983 else:
984 strict = interprete_file_source(value)
986 if isinstance(strict, (HttpUrl, AnyUrl)):
987 if strict.path is None or "." not in (path := strict.path):
988 actual_suffixes = []
989 else:
990 if (
991 strict.host == "zenodo.org"
992 and path.startswith("/api/records/")
993 and path.endswith("/content")
994 ):
995 # Zenodo API URLs have a "/content" suffix that should be ignored
996 path = path[: -len("/content")]
998 actual_suffixes = [f".{path.split('.')[-1]}"]
1000 elif isinstance(strict, PurePath):
1001 actual_suffixes = strict.suffixes
1002 elif isinstance(strict, RelativeFilePath):
1003 actual_suffixes = strict.path.suffixes
1004 else:
1005 assert_never(strict)
1007 if actual_suffixes:
1008 actual_suffix = actual_suffixes[-1]
1009 else:
1010 actual_suffix = "no suffix"
1012 if (
1013 case_sensitive
1014 and actual_suffix not in suffixes
1015 or not case_sensitive
1016 and actual_suffix.lower() not in [s.lower() for s in suffixes]
1017 ):
1018 if len(suffixes) == 1:
1019 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}")
1020 else:
1021 raise ValueError(
1022 f"Expected a suffix from {suffixes}, but got {actual_suffix}"
1023 )
1025 return o_value
1028def populate_cache(sources: Sequence[Union[FileDescr, LightHttpFileDescr]]):
1029 unique: Set[str] = set()
1030 for src in sources:
1031 if src.sha256 is None:
1032 continue # not caching without known SHA
1034 if isinstance(src.source, (HttpUrl, pydantic.AnyUrl)):
1035 url = str(src.source)
1036 elif isinstance(src.source, RelativeFilePath):
1037 if isinstance(absolute := src.source.absolute(), HttpUrl):
1038 url = str(absolute)
1039 else:
1040 continue # not caching local paths
1041 elif isinstance(src.source, Path):
1042 continue # not caching local paths
1043 else:
1044 assert_never(src.source)
1046 if url in unique:
1047 continue # skip duplicate URLs
1049 unique.add(url)
1050 _ = src.download()