Coverage for bioimageio/spec/_internal/io.py: 77%
444 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-02 14:21 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-02 14:21 +0000
1from __future__ import annotations
3import hashlib
4import io
5import sys
6import warnings
7import zipfile
8from abc import abstractmethod
9from contextlib import nullcontext
10from dataclasses import dataclass
11from datetime import date as _date
12from datetime import datetime as _datetime
13from pathlib import Path, PurePath
14from tempfile import mktemp
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Dict,
19 Generic,
20 Iterable,
21 List,
22 Optional,
23 Protocol,
24 Sequence,
25 Tuple,
26 Type,
27 TypedDict,
28 TypeVar,
29 Union,
30)
31from urllib.parse import urlparse, urlsplit, urlunsplit
32from zipfile import ZipFile, is_zipfile
34import pooch # pyright: ignore [reportMissingTypeStubs]
35import pydantic
36import requests
37from pydantic import (
38 AnyUrl,
39 DirectoryPath,
40 Field,
41 GetCoreSchemaHandler,
42 PlainSerializer,
43 PrivateAttr,
44 RootModel,
45 SerializationInfo,
46 TypeAdapter,
47 model_validator,
48)
49from pydantic_core import core_schema
50from tqdm import tqdm
51from typing_extensions import (
52 Annotated,
53 LiteralString,
54 NotRequired,
55 Self,
56 TypeGuard,
57 Unpack,
58 assert_never,
59)
60from typing_extensions import TypeAliasType as _TypeAliasType
62from ._settings import settings
63from .io_basics import (
64 ALL_BIOIMAGEIO_YAML_NAMES,
65 ALTERNATIVE_BIOIMAGEIO_YAML_NAMES,
66 BIOIMAGEIO_YAML,
67 AbsoluteDirectory,
68 AbsoluteFilePath,
69 FileName,
70 FilePath,
71 Sha256,
72 ZipPath,
73)
74from .node import Node
75from .packaging_context import packaging_context_var
76from .root_url import RootHttpUrl
77from .type_guards import is_mapping, is_sequence
78from .url import HttpUrl
79from .validation_context import get_validation_context
80from .validator_annotations import AfterValidator
82if sys.version_info < (3, 10):
83 SLOTS: Dict[str, bool] = {}
84else:
85 SLOTS = {"slots": True}
88AbsolutePathT = TypeVar(
89 "AbsolutePathT",
90 bound=Union[HttpUrl, AbsoluteDirectory, AbsoluteFilePath, ZipPath],
91)
94class RelativePathBase(RootModel[PurePath], Generic[AbsolutePathT], frozen=True):
95 _absolute: AbsolutePathT = PrivateAttr()
97 @property
98 def path(self) -> PurePath:
99 return self.root
101 def absolute( # method not property analog to `pathlib.Path.absolute()`
102 self,
103 ) -> AbsolutePathT:
104 """get the absolute path/url
106 (resolved at time of initialization with the root of the ValidationContext)
107 """
108 return self._absolute
110 def model_post_init(self, __context: Any) -> None:
111 """set `_absolute` property with validation context at creation time. @private"""
112 if self.root.is_absolute():
113 raise ValueError(f"{self.root} is an absolute path.")
115 if self.root.parts and self.root.parts[0] in ("http:", "https:"):
116 raise ValueError(f"{self.root} looks like an http url.")
118 self._absolute = ( # pyright: ignore[reportAttributeAccessIssue]
119 self.get_absolute(get_validation_context().root)
120 )
121 super().model_post_init(__context)
123 # @property
124 # def __members(self):
125 # return (self.path,)
127 # def __eq__(self, __value: object) -> bool:
128 # return type(__value) is type(self) and self.__members == __value.__members
130 # def __hash__(self) -> int:
131 # return hash(self.__members)
133 def __str__(self) -> str:
134 return self.root.as_posix()
136 def __repr__(self) -> str:
137 return f"RelativePath('{self}')"
139 @abstractmethod
140 def get_absolute(
141 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
142 ) -> AbsolutePathT: ...
144 def _get_absolute_impl(
145 self, root: Union[RootHttpUrl, AbsoluteDirectory, pydantic.AnyUrl, ZipFile]
146 ) -> Union[Path, HttpUrl, ZipPath]:
147 if isinstance(root, Path):
148 return (root / self.root).absolute()
150 rel_path = self.root.as_posix().strip("/")
151 if isinstance(root, ZipFile):
152 return ZipPath(root, rel_path)
154 parsed = urlsplit(str(root))
155 path = list(parsed.path.strip("/").split("/"))
156 if (
157 parsed.netloc == "zenodo.org"
158 and parsed.path.startswith("/api/records/")
159 and parsed.path.endswith("/content")
160 ):
161 path.insert(-1, rel_path)
162 else:
163 path.append(rel_path)
165 return HttpUrl(
166 urlunsplit(
167 (
168 parsed.scheme,
169 parsed.netloc,
170 "/".join(path),
171 parsed.query,
172 parsed.fragment,
173 )
174 )
175 )
177 @classmethod
178 def _validate(cls, value: Union[PurePath, str]):
179 if isinstance(value, str) and (
180 value.startswith("https://") or value.startswith("http://")
181 ):
182 raise ValueError(f"{value} looks like a URL, not a relative path")
184 return cls(PurePath(value))
187class RelativeFilePath(
188 RelativePathBase[Union[AbsoluteFilePath, HttpUrl, ZipPath]], frozen=True
189):
190 """A path relative to the `rdf.yaml` file (also if the RDF source is a URL)."""
192 def model_post_init(self, __context: Any) -> None:
193 """add validation @private"""
194 if not self.root.parts: # an empty path can only be a directory
195 raise ValueError(f"{self.root} is not a valid file path.")
197 super().model_post_init(__context)
199 def get_absolute(
200 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
201 ) -> "AbsoluteFilePath | HttpUrl | ZipPath":
202 absolute = self._get_absolute_impl(root)
203 if (
204 isinstance(absolute, Path)
205 and (context := get_validation_context()).perform_io_checks
206 and str(self.root) not in context.known_files
207 and not absolute.is_file()
208 ):
209 raise ValueError(f"{absolute} does not point to an existing file")
211 return absolute
214class RelativeDirectory(
215 RelativePathBase[Union[AbsoluteDirectory, HttpUrl, ZipPath]], frozen=True
216):
217 def get_absolute(
218 self, root: "RootHttpUrl | Path | AnyUrl | ZipFile"
219 ) -> "AbsoluteDirectory | HttpUrl | ZipPath":
220 absolute = self._get_absolute_impl(root)
221 if (
222 isinstance(absolute, Path)
223 and get_validation_context().perform_io_checks
224 and not absolute.is_dir()
225 ):
226 raise ValueError(f"{absolute} does not point to an existing directory")
228 return absolute
231FileSource = Annotated[
232 Union[HttpUrl, RelativeFilePath, FilePath],
233 Field(union_mode="left_to_right"),
234]
235PermissiveFileSource = Union[FileSource, str, pydantic.HttpUrl]
237V_suffix = TypeVar("V_suffix", bound=FileSource)
238path_or_url_adapter: "TypeAdapter[Union[FilePath, DirectoryPath, HttpUrl]]" = (
239 TypeAdapter(Union[FilePath, DirectoryPath, HttpUrl])
240)
243def validate_suffix(
244 value: V_suffix, suffix: Union[str, Sequence[str]], case_sensitive: bool
245) -> V_suffix:
246 """check final suffix"""
247 if isinstance(suffix, str):
248 suffixes = [suffix]
249 else:
250 suffixes = suffix
252 assert len(suffixes) > 0, "no suffix given"
253 assert all(
254 suff.startswith(".") for suff in suffixes
255 ), "expected suffixes to start with '.'"
256 o_value = value
257 strict = interprete_file_source(value)
259 if isinstance(strict, (HttpUrl, AnyUrl)):
260 if strict.path is None or "." not in (path := strict.path):
261 actual_suffix = ""
262 elif (
263 strict.host == "zenodo.org"
264 and path.startswith("/api/records/")
265 and path.endswith("/content")
266 ):
267 actual_suffix = "." + path[: -len("/content")].split(".")[-1]
268 else:
269 actual_suffix = "." + path.split(".")[-1]
271 elif isinstance(strict, PurePath):
272 actual_suffix = strict.suffixes[-1]
273 elif isinstance(strict, RelativeFilePath):
274 actual_suffix = strict.path.suffixes[-1]
275 else:
276 assert_never(strict)
278 if (
279 case_sensitive
280 and actual_suffix not in suffixes
281 or not case_sensitive
282 and actual_suffix.lower() not in [s.lower() for s in suffixes]
283 ):
284 if len(suffixes) == 1:
285 raise ValueError(f"Expected suffix {suffixes[0]}, but got {actual_suffix}")
286 else:
287 raise ValueError(
288 f"Expected a suffix from {suffixes}, but got {actual_suffix}"
289 )
291 return o_value
294@dataclass(frozen=True, **SLOTS)
295class WithSuffix:
296 suffix: Union[LiteralString, Tuple[LiteralString, ...]]
297 case_sensitive: bool
299 def __get_pydantic_core_schema__(
300 self, source: Type[Any], handler: GetCoreSchemaHandler
301 ):
302 if not self.suffix:
303 raise ValueError("suffix may not be empty")
305 schema = handler(source)
306 return core_schema.no_info_after_validator_function(
307 self.validate,
308 schema,
309 )
311 def validate(self, value: FileSource) -> FileSource:
312 return validate_suffix(value, self.suffix, case_sensitive=self.case_sensitive)
315def wo_special_file_name(src: FileSource) -> FileSource:
316 if has_valid_bioimageio_yaml_name(src):
317 raise ValueError(
318 f"'{src}' not allowed here as its filename is reserved to identify"
319 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
320 )
322 return src
325def _package(value: FileSource, info: SerializationInfo) -> Union[str, Path, FileName]:
326 if (packaging_context := packaging_context_var.get()) is None:
327 # convert to standard python obj
328 # note: pydantic keeps returning Rootmodels (here `HttpUrl`) as-is, but if
329 # this function returns one RootModel, paths are "further serialized" by
330 # returning the 'root' attribute, which is incorrect.
331 # see https://github.com/pydantic/pydantic/issues/8963
332 # TODO: follow up on https://github.com/pydantic/pydantic/issues/8963
333 if isinstance(value, Path):
334 unpackaged = value
335 elif isinstance(value, HttpUrl):
336 unpackaged = value
337 elif isinstance(value, RelativeFilePath):
338 unpackaged = Path(value.path)
339 elif isinstance(value, AnyUrl):
340 unpackaged = str(value)
341 else:
342 assert_never(value)
344 if info.mode_is_json():
345 # convert to json value # TODO: remove and let pydantic do this?
346 if isinstance(unpackaged, Path):
347 unpackaged = str(unpackaged)
348 elif isinstance(unpackaged, str):
349 pass
350 else:
351 assert_never(unpackaged)
352 else:
353 warnings.warn(
354 "dumping with mode='python' is currently not fully supported for "
355 + "fields that are included when packaging; returned objects are "
356 + "standard python objects"
357 )
359 return unpackaged # return unpackaged file source
361 # package the file source:
362 # add it to the current package's file sources and return its collision free file name
363 if isinstance(value, RelativeFilePath):
364 src = value.absolute()
365 elif isinstance(value, pydantic.AnyUrl):
366 src = HttpUrl(str(value))
367 elif isinstance(value, HttpUrl):
368 src = value
369 elif isinstance(value, Path):
370 src = value.resolve()
371 else:
372 assert_never(value)
374 fname = extract_file_name(src)
375 if fname == packaging_context.bioimageio_yaml_file_name:
376 raise ValueError(
377 f"Reserved file name '{packaging_context.bioimageio_yaml_file_name}' "
378 + "not allowed for a file to be packaged"
379 )
381 fsrcs = packaging_context.file_sources
382 assert not any(
383 fname.endswith(special) for special in ALL_BIOIMAGEIO_YAML_NAMES
384 ), fname
385 if fname in fsrcs and fsrcs[fname] != src:
386 for i in range(2, 20):
387 fn, *ext = fname.split(".")
388 alternative_file_name = ".".join([f"{fn}_{i}", *ext])
389 if (
390 alternative_file_name not in fsrcs
391 or fsrcs[alternative_file_name] == src
392 ):
393 fname = alternative_file_name
394 break
395 else:
396 raise ValueError(f"Too many file name clashes for {fname}")
398 fsrcs[fname] = src
399 return fname
402include_in_package_serializer = PlainSerializer(_package, when_used="unless-none")
403ImportantFileSource = Annotated[
404 FileSource,
405 AfterValidator(wo_special_file_name),
406 include_in_package_serializer,
407]
408InPackageIfLocalFileSource = Union[
409 Annotated[
410 Union[FilePath, RelativeFilePath],
411 AfterValidator(wo_special_file_name),
412 include_in_package_serializer,
413 ],
414 Union[HttpUrl, pydantic.HttpUrl],
415]
418def has_valid_bioimageio_yaml_name(src: FileSource) -> bool:
419 return is_valid_bioimageio_yaml_name(extract_file_name(src))
422def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool:
423 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
424 if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name):
425 return True
427 return False
430def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName:
431 file_names = sorted(file_names)
432 for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
433 for file_name in file_names:
434 if file_name == bioimageio_name or file_name.endswith(
435 "." + bioimageio_name
436 ):
437 return file_name
439 raise ValueError(
440 f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'"
441 + " or or any of the alterntive file names:"
442 + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of"
443 + f" those, e.g. 'anything.{BIOIMAGEIO_YAML}')."
444 )
447def find_bioimageio_yaml_file_name(path: Union[Path, ZipFile]) -> FileName:
448 if isinstance(path, ZipFile):
449 file_names = path.namelist()
450 elif path.is_file():
451 if not is_zipfile(path):
452 return path.name
454 with ZipFile(path, "r") as f:
455 file_names = f.namelist()
456 else:
457 file_names = [p.name for p in path.glob("*")]
459 return identify_bioimageio_yaml_file_name(file_names)
462def ensure_has_valid_bioimageio_yaml_name(src: FileSource) -> FileSource:
463 if not has_valid_bioimageio_yaml_name(src):
464 raise ValueError(
465 f"'{src}' does not have a valid filename to identify"
466 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
467 )
469 return src
472def ensure_is_valid_bioimageio_yaml_name(file_name: FileName) -> FileName:
473 if not is_valid_bioimageio_yaml_name(file_name):
474 raise ValueError(
475 f"'{file_name}' is not a valid filename to identify"
476 + f" '{BIOIMAGEIO_YAML}' (or equivalent) files."
477 )
479 return file_name
482# types as loaded from YAML 1.2 (with ruyaml)
483YamlLeafValue = Union[
484 bool, _date, _datetime, int, float, str, None
485] # note: order relevant for deserializing
486YamlKey = Union[ # YAML Arrays are cast to tuples if used as key in mappings
487 YamlLeafValue, Tuple[YamlLeafValue, ...] # (nesting is not allowed though)
488]
489if TYPE_CHECKING:
490 YamlValue = Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]]
491else:
492 # for pydantic validation we need to use `TypeAliasType`,
493 # see https://docs.pydantic.dev/latest/concepts/types/#named-recursive-types
494 # however this results in a partially unknown type with the current pyright 1.1.388
495 YamlValue = _TypeAliasType(
496 "YamlValue",
497 Union[YamlLeafValue, List["YamlValue"], Dict[YamlKey, "YamlValue"]],
498 )
499BioimageioYamlContent = Dict[str, YamlValue]
500BioimageioYamlSource = Union[PermissiveFileSource, ZipFile, BioimageioYamlContent]
503def is_yaml_leaf_value(value: Any) -> TypeGuard[YamlLeafValue]:
504 return isinstance(value, (bool, _date, _datetime, int, float, str, type(None)))
507def is_yaml_list(value: Any) -> TypeGuard[List[YamlValue]]:
508 return is_sequence(value) and all(is_yaml_value(item) for item in value)
511def is_yaml_mapping(value: Any) -> TypeGuard[BioimageioYamlContent]:
512 return is_mapping(value) and all(
513 isinstance(key, str) and is_yaml_value(val) for key, val in value.items()
514 )
517def is_yaml_value(value: Any) -> TypeGuard[YamlValue]:
518 return is_yaml_leaf_value(value) or is_yaml_list(value) or is_yaml_mapping(value)
521@dataclass
522class OpenedBioimageioYaml:
523 content: BioimageioYamlContent
524 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
525 original_file_name: FileName
526 unparsed_content: str
529@dataclass
530class LocalFile:
531 path: FilePath
532 original_root: Union[AbsoluteDirectory, RootHttpUrl, ZipFile]
533 original_file_name: FileName
536@dataclass
537class FileInZip:
538 path: ZipPath
539 original_root: Union[RootHttpUrl, ZipFile]
540 original_file_name: FileName
543class HashKwargs(TypedDict):
544 sha256: NotRequired[Optional[Sha256]]
547_file_source_adapter: TypeAdapter[Union[HttpUrl, RelativeFilePath, FilePath]] = (
548 TypeAdapter(FileSource)
549)
552def interprete_file_source(file_source: PermissiveFileSource) -> FileSource:
553 if isinstance(file_source, Path):
554 if file_source.is_dir():
555 raise FileNotFoundError(
556 f"{file_source} is a directory, but expected a file."
557 )
558 return file_source
560 if isinstance(file_source, HttpUrl):
561 return file_source
563 if isinstance(file_source, pydantic.AnyUrl):
564 file_source = str(file_source)
566 with get_validation_context().replace(perform_io_checks=False):
567 strict = _file_source_adapter.validate_python(file_source)
568 if isinstance(strict, Path) and strict.is_dir():
569 raise FileNotFoundError(f"{strict} is a directory, but expected a file.")
571 return strict
574def _get_known_hash(hash_kwargs: HashKwargs):
575 if "sha256" in hash_kwargs and hash_kwargs["sha256"] is not None:
576 return f"sha256:{hash_kwargs['sha256']}"
577 else:
578 return None
581def _get_unique_file_name(url: Union[HttpUrl, pydantic.HttpUrl]):
582 """
583 Create a unique file name based on the given URL;
584 adapted from pooch.utils.unique_file_name
585 """
586 md5 = hashlib.md5(str(url).encode()).hexdigest()
587 fname = extract_file_name(url)
588 # Crop the start of the file name to fit 255 characters including the hash
589 # and the :
590 fname = fname[-(255 - len(md5) - 1) :]
591 unique_name = f"{md5}-{fname}"
592 return unique_name
595class Progressbar(Protocol):
596 count: int
597 total: int
599 def update(self, i: int): ...
601 def reset(self): ...
603 def close(self): ...
606def extract(
607 source: Union[FilePath, ZipFile, ZipPath],
608 folder: Optional[DirectoryPath] = None,
609 overwrite: bool = False,
610) -> DirectoryPath:
611 extract_member = None
612 if isinstance(source, ZipPath):
613 extract_member = source.at
614 source = source.root
616 if isinstance(source, ZipFile):
617 zip_context = nullcontext(source)
618 if folder is None:
619 if source.filename is None:
620 folder = Path(mktemp())
621 else:
622 zip_path = Path(source.filename)
623 folder = zip_path.with_suffix(zip_path.suffix + ".unzip")
624 else:
625 zip_context = ZipFile(source, "r")
626 if folder is None:
627 folder = source.with_suffix(source.suffix + ".unzip")
629 if overwrite and folder.exists():
630 warnings.warn(f"Overwriting existing unzipped archive at {folder}")
632 with zip_context as f:
633 if extract_member is not None:
634 extracted_file_path = folder / extract_member
635 if extracted_file_path.exists() and not overwrite:
636 warnings.warn(f"Found unzipped {extracted_file_path}.")
637 else:
638 _ = f.extract(extract_member, folder)
640 return folder
642 elif overwrite or not folder.exists():
643 f.extractall(folder)
644 return folder
646 found_content = {p.relative_to(folder).as_posix() for p in folder.glob("*")}
647 expected_content = {info.filename for info in f.filelist}
648 if expected_missing := expected_content - found_content:
649 parts = folder.name.split("_")
650 nr, *suffixes = parts[-1].split(".")
651 if nr.isdecimal():
652 nr = str(int(nr) + 1)
653 else:
654 nr = f"1.{nr}"
656 parts[-1] = ".".join([nr, *suffixes])
657 out_path_new = folder.with_name("_".join(parts))
658 warnings.warn(
659 f"Unzipped archive at {folder} is missing expected files"
660 + f" {expected_missing}."
661 + f" Unzipping to {out_path_new} instead to avoid overwriting."
662 )
663 return extract(f, out_path_new, overwrite=overwrite)
664 else:
665 warnings.warn(
666 f"Found unzipped archive with all expected files at {folder}."
667 )
668 return folder
671def resolve(
672 source: Union[PermissiveFileSource, FileDescr, ZipPath],
673 /,
674 progressbar: Union[Progressbar, bool, None] = None,
675 **kwargs: Unpack[HashKwargs],
676) -> Union[LocalFile, FileInZip]:
677 """Resolve file `source` (download if needed)"""
679 if isinstance(source, str):
680 source = interprete_file_source(source)
682 if isinstance(source, RelativeFilePath):
683 source = source.absolute()
684 if isinstance(source, ZipPath):
685 return FileInZip(source, source.root, extract_file_name(source))
687 if isinstance(source, pydantic.AnyUrl):
688 with get_validation_context().replace(perform_io_checks=False):
689 source = HttpUrl(source)
691 if isinstance(source, FileDescr):
692 return source.download()
693 elif isinstance(source, ZipPath):
694 zip_root = source.root
695 assert isinstance(zip_root, ZipFile)
696 return FileInZip(
697 source,
698 zip_root,
699 extract_file_name(source),
700 )
701 elif isinstance(source, Path):
702 if source.is_dir():
703 raise FileNotFoundError(f"{source} is a directory, not a file")
705 if not source.exists():
706 raise FileNotFoundError(source)
708 return LocalFile(
709 source,
710 source.parent,
711 extract_file_name(source),
712 )
713 elif isinstance(source, HttpUrl):
714 if source.scheme not in ("http", "https"):
715 raise NotImplementedError(source.scheme)
717 if settings.CI:
718 headers = {"User-Agent": "ci"}
719 if progressbar is None:
720 progressbar = False
721 else:
722 headers = {}
723 if progressbar is None:
724 progressbar = True
726 if settings.user_agent is not None:
727 headers["User-Agent"] = settings.user_agent
729 chunk_size = 1024
730 if (
731 settings.cache_path
732 and not get_validation_context().disable_cache
733 and any(v is not None for v in kwargs.values())
734 ):
735 downloader = pooch.HTTPDownloader(
736 headers=headers,
737 progressbar=progressbar, # pyright: ignore[reportArgumentType]
738 chunk_size=chunk_size,
739 )
740 fname = _get_unique_file_name(source)
741 _ls: Any = pooch.retrieve(
742 url=str(source),
743 known_hash=_get_known_hash(kwargs),
744 downloader=downloader,
745 fname=fname,
746 path=settings.cache_path,
747 )
748 local_source = Path(_ls).absolute()
749 return LocalFile(
750 local_source,
751 source.parent,
752 extract_file_name(source),
753 )
754 else:
755 # cacheless download to memory using an in memory zip file
756 r = requests.get(str(source), stream=True)
757 r.raise_for_status()
759 zf = zipfile.ZipFile(io.BytesIO(), "w")
760 fn = extract_file_name(source)
761 total = int(r.headers.get("content-length", 0))
763 if isinstance(progressbar, bool):
764 if progressbar:
765 use_ascii = bool(sys.platform == "win32")
766 pbar = tqdm(
767 total=total,
768 ncols=79,
769 ascii=use_ascii,
770 unit="B",
771 unit_scale=True,
772 leave=True,
773 )
774 pbar = tqdm(desc=f"Downloading {fn}")
775 else:
776 pbar = None
777 else:
778 pbar = progressbar
780 zp = ZipPath(zf, fn)
781 with zp.open("wb") as z:
782 assert not isinstance(z, io.TextIOWrapper)
783 for chunk in r.iter_content(chunk_size=chunk_size):
784 n = z.write(chunk)
785 if pbar is not None:
786 _ = pbar.update(n)
788 # Make sure the progress bar gets filled even if the actual number
789 # is chunks is smaller than expected. This happens when streaming
790 # text files that are compressed by the server when sending (gzip).
791 # Binary files don't experience this.
792 # (adapted from pooch.HttpDownloader)
793 if pbar is not None:
794 pbar.reset()
795 _ = pbar.update(total)
796 pbar.close()
798 return FileInZip(
799 path=zp,
800 original_root=source.parent,
801 original_file_name=fn,
802 )
804 else:
805 assert_never(source)
808download = resolve
811def resolve_and_extract(
812 source: Union[PermissiveFileSource, FileDescr, ZipPath],
813 /,
814 progressbar: Union[Progressbar, bool, None] = None,
815 **kwargs: Unpack[HashKwargs],
816) -> LocalFile:
817 """Resolve `source` within current ValidationContext,
818 download if needed and
819 extract file if within zip archive.
821 note: If source points to a zip file it is not extracted
822 """
823 local = resolve(source, progressbar=progressbar, **kwargs)
824 if isinstance(local, LocalFile):
825 return local
827 folder = extract(local.path)
829 return LocalFile(
830 folder / local.path.at,
831 original_root=local.original_root,
832 original_file_name=local.original_file_name,
833 )
836class LightHttpFileDescr(Node):
837 """http source with sha256 value (minimal validation)"""
839 source: pydantic.HttpUrl
840 """file source"""
842 sha256: Sha256
843 """SHA256 checksum of the source file"""
845 def download(self):
846 return download(self.source, sha256=self.sha256)
849class FileDescr(Node):
850 source: ImportantFileSource
851 """∈📦 file source"""
853 sha256: Optional[Sha256] = None
854 """SHA256 checksum of the source file"""
856 @model_validator(mode="after")
857 def _validate_sha256(self) -> Self:
858 if get_validation_context().perform_io_checks:
859 self.validate_sha256()
861 return self
863 def validate_sha256(self):
864 context = get_validation_context()
865 if (src_str := str(self.source)) in context.known_files:
866 actual_sha = context.known_files[src_str]
867 else:
868 local_source = download(self.source, sha256=self.sha256).path
869 actual_sha = get_sha256(local_source)
870 context.known_files[src_str] = actual_sha
872 if actual_sha is None:
873 return
874 elif self.sha256 == actual_sha:
875 pass
876 elif self.sha256 is None or context.update_hashes:
877 self.sha256 = actual_sha
878 elif self.sha256 != actual_sha:
879 raise ValueError(
880 f"Sha256 mismatch for {self.source}. Expected {self.sha256}, got "
881 + f"{actual_sha}. Update expected `sha256` or point to the matching "
882 + "file."
883 )
885 def download(self):
887 return download(self.source, sha256=self.sha256)
890def extract_file_name(
891 src: Union[pydantic.HttpUrl, HttpUrl, PurePath, RelativeFilePath, ZipPath],
892) -> FileName:
893 if isinstance(src, ZipPath):
894 return src.name or src.root.filename or "bioimageio.zip"
895 elif isinstance(src, RelativeFilePath):
896 return src.path.name
897 elif isinstance(src, PurePath):
898 return src.name
899 else:
900 url = urlparse(str(src))
901 if (
902 url.scheme == "https"
903 and url.hostname == "zenodo.org"
904 and url.path.startswith("/api/records/")
905 and url.path.endswith("/content")
906 ):
907 return url.path.split("/")[-2]
908 else:
909 return url.path.split("/")[-1]
912def get_sha256(path: Union[Path, ZipPath]) -> Sha256:
913 """from https://stackoverflow.com/a/44873382"""
914 if isinstance(path, ZipPath):
915 # no buffered reading available
916 zf = path.root
917 assert isinstance(zf, ZipFile)
918 data = path.read_bytes()
919 assert isinstance(data, bytes)
920 h = hashlib.sha256(data)
921 else:
922 h = hashlib.sha256()
923 chunksize = 128 * 1024
924 b = bytearray(chunksize)
925 mv = memoryview(b)
926 with open(path, "rb", buffering=0) as f:
927 for n in iter(lambda: f.readinto(mv), 0):
928 h.update(mv[:n])
930 sha = h.hexdigest()
931 assert len(sha) == 64
932 return Sha256(sha)