Skip to content

utils ¤

Utility functions for bioimage.io specifications (mostly IO).

Classes:

Name Description
SpdxLicenseEntry
SpdxLicenses

Functions:

Name Description
empty_cache

Empty the bioimageio disk cache.

ensure_description_is_dataset
ensure_description_is_model

Raises:

extract_file_name
get_bioimageio_json_schema

get the bioimageio specification as a JSON schema

get_reader

Open a file source (download if needed)

get_sha256
get_spdx_licenses

get details of the SPDX licenses known to bioimageio.spec

identify_bioimageio_yaml_file_name
interprete_file_source
is_valid_bioimageio_yaml_name
load_array

load a numpy ndarray from a .npy file

load_image

load a single image as numpy array

open_bioimageio_yaml
read_yaml
save_array

save a numpy ndarray to a .npy file

write_yaml

Attributes:

Name Type Description
download
get_file_name

download module-attribute ¤

download = get_reader

get_file_name module-attribute ¤

get_file_name = extract_file_name

SpdxLicenseEntry ¤

Bases: TypedDict


              flowchart TD
              bioimageio.spec.utils.SpdxLicenseEntry[SpdxLicenseEntry]

              

              click bioimageio.spec.utils.SpdxLicenseEntry href "" "bioimageio.spec.utils.SpdxLicenseEntry"
            

Attributes:

Name Type Description
isDeprecatedLicenseId bool
isKnownByZenodo bool
isOsiApproved bool
licenseId str
name str
reference str

isDeprecatedLicenseId instance-attribute ¤

isDeprecatedLicenseId: bool

isKnownByZenodo instance-attribute ¤

isKnownByZenodo: bool

isOsiApproved instance-attribute ¤

isOsiApproved: bool

licenseId instance-attribute ¤

licenseId: str

name instance-attribute ¤

name: str

reference instance-attribute ¤

reference: str

SpdxLicenses ¤

Bases: TypedDict


              flowchart TD
              bioimageio.spec.utils.SpdxLicenses[SpdxLicenses]

              

              click bioimageio.spec.utils.SpdxLicenses href "" "bioimageio.spec.utils.SpdxLicenses"
            

Attributes:

Name Type Description
licenseListVersion str
licenses List[SpdxLicenseEntry]
releaseDate str

licenseListVersion instance-attribute ¤

licenseListVersion: str

licenses instance-attribute ¤

licenses: List[SpdxLicenseEntry]

releaseDate instance-attribute ¤

releaseDate: str

empty_cache ¤

empty_cache()

Empty the bioimageio disk cache.

Source code in src/bioimageio/spec/utils.py
 99
100
101
102
103
104
def empty_cache():
    """Empty the bioimageio disk cache."""

    shutil.rmtree(settings.cache_path)
    settings.cache_path.mkdir(parents=True, exist_ok=True)
    logger.info("Emptied cache at {}", settings.cache_path)

ensure_description_is_dataset ¤

ensure_description_is_dataset(
    rd: Union[InvalidDescr, ResourceDescr],
) -> AnyDatasetDescr
Source code in src/bioimageio/spec/_description.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def ensure_description_is_dataset(
    rd: Union[InvalidDescr, ResourceDescr],
) -> AnyDatasetDescr:
    if isinstance(rd, InvalidDescr):
        rd.validation_summary.display()
        raise ValueError(f"Invalid {rd.type} description.")

    if rd.type != "dataset":
        rd.validation_summary.display()
        raise ValueError(
            f"Expected a dataset resource, but got resource type '{rd.type}'"
        )

    assert not isinstance(
        rd,
        (
            GenericDescr_v0_2,
            GenericDescr_v0_3,
        ),
    )

    return rd

ensure_description_is_model ¤

ensure_description_is_model(
    rd: Union[InvalidDescr, ResourceDescr],
) -> AnyModelDescr

Raises:

Type Description
ValueError

for invalid or non-model resources

Source code in src/bioimageio/spec/_description.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def ensure_description_is_model(
    rd: Union[InvalidDescr, ResourceDescr],
) -> AnyModelDescr:
    """
    Raises:
        ValueError: for invalid or non-model resources
    """
    if isinstance(rd, InvalidDescr):
        rd.validation_summary.display()
        raise ValueError(f"Invalid {rd.type} description")

    if rd.type != "model":
        rd.validation_summary.display()
        raise ValueError(
            f"Expected a model resource, but got resource type '{rd.type}'"
        )

    assert not isinstance(
        rd,
        (
            GenericDescr_v0_2,
            GenericDescr_v0_3,
        ),
    )

    return rd

extract_file_name ¤

extract_file_name(
    src: Union[
        pydantic.HttpUrl,
        RootHttpUrl,
        PurePath,
        RelativeFilePath,
        ZipPath,
        FileDescr,
    ],
) -> FileName
Source code in src/bioimageio/spec/_internal/io.py
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
def extract_file_name(
    src: Union[
        pydantic.HttpUrl, RootHttpUrl, PurePath, RelativeFilePath, ZipPath, FileDescr
    ],
) -> FileName:
    if isinstance(src, FileDescr):
        src = src.source

    if isinstance(src, ZipPath):
        return src.name or src.root.filename or "bioimageio.zip"
    elif isinstance(src, RelativeFilePath):
        return src.path.name
    elif isinstance(src, PurePath):
        return src.name
    else:
        url = urlparse(str(src))
        if (
            url.scheme == "https"
            and url.hostname == "zenodo.org"
            and url.path.startswith("/api/records/")
            and url.path.endswith("/content")
        ):
            return url.path.split("/")[-2]
        else:
            return url.path.split("/")[-1]

get_bioimageio_json_schema ¤

get_bioimageio_json_schema() -> Dict[str, Any]

get the bioimageio specification as a JSON schema

Source code in src/bioimageio/spec/utils.py
62
63
64
65
66
67
68
69
def get_bioimageio_json_schema() -> Dict[str, Any]:
    """get the bioimageio specification as a JSON schema"""
    with (
        files("bioimageio.spec")
        .joinpath("static/bioimageio_schema.json")
        .open("r", encoding="utf-8")
    ) as f:
        return json.load(f)

get_reader ¤

get_reader(
    source: Union[PermissiveFileSource, FileDescr, ZipPath],
    /,
    progressbar: Union[
        Progressbar, Callable[[], Progressbar], bool, None
    ] = None,
    **kwargs: Unpack[HashKwargs],
) -> BytesReader

Open a file source (download if needed)

Source code in src/bioimageio/spec/_internal/io.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def get_reader(
    source: Union[PermissiveFileSource, FileDescr, ZipPath],
    /,
    progressbar: Union[Progressbar, Callable[[], Progressbar], bool, None] = None,
    **kwargs: Unpack[HashKwargs],
) -> BytesReader:
    """Open a file `source` (download if needed)"""
    if isinstance(source, FileDescr):
        if "sha256" not in kwargs:
            kwargs["sha256"] = source.sha256

        source = source.source
    elif isinstance(source, str):
        source = interprete_file_source(source)

    if isinstance(source, RelativeFilePath):
        source = source.absolute()
    elif isinstance(source, pydantic.AnyUrl):
        with get_validation_context().replace(perform_io_checks=False):
            source = HttpUrl(source)

    if isinstance(source, HttpUrl):
        return _open_url(source, progressbar=progressbar, **kwargs)

    if isinstance(source, ZipPath):
        if not source.exists():
            raise FileNotFoundError(source)

        f = source.open(mode="rb")
        assert not isinstance(f, TextIOWrapper)
        root = source.root
    elif isinstance(source, Path):
        if source.is_dir():
            raise FileNotFoundError(f"{source} is a directory, not a file")

        if not source.exists():
            raise FileNotFoundError(source)

        f = source.open("rb")
        root = source.parent
    else:
        assert_never(source)

    expected_sha = kwargs.get("sha256")
    if expected_sha is None:
        sha = None
    else:
        sha = get_sha256(f)
        _ = f.seek(0)
        if sha != expected_sha:
            raise ValueError(
                f"SHA256 mismatch for {source}. Expected {expected_sha}, got {sha}."
            )

    return BytesReader(
        f,
        sha256=sha,
        suffix=source.suffix,
        original_file_name=source.name,
        original_root=root,
        is_zipfile=None,
    )

get_sha256 ¤

get_sha256(
    source: Union[BytesReaderP, BytesReaderIntoP, Path],
) -> Sha256
Source code in src/bioimageio/spec/_internal/io_basics.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_sha256(source: Union[BytesReaderP, BytesReaderIntoP, Path]) -> Sha256:
    chunksize = 128 * 1024
    h = hashlib.sha256()

    if isinstance(source, BytesReaderIntoP):
        b = bytearray(chunksize)
        mv = memoryview(b)
        for n in iter(lambda: source.readinto(mv), 0):
            h.update(mv[:n])
    else:
        if isinstance(source, Path):
            read_ctxt = source.open(mode="rb")
        else:
            read_ctxt = nullcontext(source)

        with read_ctxt as r:
            for chunk in iter(partial(r.read, chunksize), b""):
                h.update(chunk)

    sha = h.hexdigest()
    return Sha256(sha)

get_spdx_licenses ¤

get_spdx_licenses() -> SpdxLicenses

get details of the SPDX licenses known to bioimageio.spec

Source code in src/bioimageio/spec/utils.py
52
53
54
55
56
57
58
59
def get_spdx_licenses() -> SpdxLicenses:
    """get details of the SPDX licenses known to bioimageio.spec"""
    with (
        files("bioimageio.spec")
        .joinpath("static/spdx_licenses.json")
        .open("r", encoding="utf-8")
    ) as f:
        return json.load(f)

identify_bioimageio_yaml_file_name ¤

identify_bioimageio_yaml_file_name(
    file_names: Iterable[FileName],
) -> FileName
Source code in src/bioimageio/spec/_internal/io.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def identify_bioimageio_yaml_file_name(file_names: Iterable[FileName]) -> FileName:
    file_names = sorted(file_names)
    for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
        for file_name in file_names:
            if file_name == bioimageio_name or file_name.endswith(
                "." + bioimageio_name
            ):
                return file_name

    raise ValueError(
        f"No {BIOIMAGEIO_YAML} found in {file_names}. (Looking for '{BIOIMAGEIO_YAML}'"
        + " or or any of the alterntive file names:"
        + f" {ALTERNATIVE_BIOIMAGEIO_YAML_NAMES}, or any file with an extension of"
        + f"  those, e.g. 'anything.{BIOIMAGEIO_YAML}')."
    )

interprete_file_source ¤

interprete_file_source(
    file_source: PermissiveFileSource,
) -> FileSource
Source code in src/bioimageio/spec/_internal/io.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def interprete_file_source(file_source: PermissiveFileSource) -> FileSource:
    if isinstance(file_source, Path):
        if file_source.is_dir():
            raise FileNotFoundError(
                f"{file_source} is a directory, but expected a file."
            )
        return file_source

    if isinstance(file_source, HttpUrl):
        return file_source

    if isinstance(file_source, pydantic.AnyUrl):
        file_source = str(file_source)

    with get_validation_context().replace(perform_io_checks=False):
        strict = _file_source_adapter.validate_python(file_source)
        if isinstance(strict, Path) and strict.is_dir():
            raise FileNotFoundError(f"{strict} is a directory, but expected a file.")

    return strict

is_valid_bioimageio_yaml_name ¤

is_valid_bioimageio_yaml_name(file_name: FileName) -> bool
Source code in src/bioimageio/spec/_internal/io.py
365
366
367
368
369
370
def is_valid_bioimageio_yaml_name(file_name: FileName) -> bool:
    for bioimageio_name in ALL_BIOIMAGEIO_YAML_NAMES:
        if file_name == bioimageio_name or file_name.endswith("." + bioimageio_name):
            return True

    return False

load_array ¤

load_array(
    source: Union[FileSource, FileDescr, ZipPath],
) -> NDArray[Any]

load a numpy ndarray from a .npy file

Source code in src/bioimageio/spec/_internal/io_utils.py
365
366
367
368
369
370
371
def load_array(source: Union[FileSource, FileDescr, ZipPath]) -> NDArray[Any]:
    """load a numpy ndarray from a .npy file"""
    reader = get_reader(source)
    if settings.allow_pickle:
        logger.warning("Loading numpy array with `allow_pickle=True`.")

    return numpy.load(reader, allow_pickle=settings.allow_pickle)

load_image ¤

load_image(
    source: Union[FileDescr, ZipPath, PermissiveFileSource],
) -> NDArray[Any]

load a single image as numpy array

Parameters:

Name Type Description Default

source ¤

Union[FileDescr, ZipPath, PermissiveFileSource]

image source

required
Source code in src/bioimageio/spec/utils.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def load_image(source: Union[FileDescr, ZipPath, PermissiveFileSource]) -> NDArray[Any]:
    """load a single image as numpy array

    Args:
        source: image source
    """

    if isinstance(source, (FileDescr, ZipPath)):
        parsed_source = source
    else:
        parsed_source = interprete_file_source(source)

    if isinstance(parsed_source, RelativeFilePath):
        parsed_source = parsed_source.absolute()

    if parsed_source.suffix == ".npy":
        image = load_array(parsed_source)
    else:
        reader = get_reader(parsed_source)
        image = imread(  # pyright: ignore[reportUnknownVariableType]
            reader.read(), extension=parsed_source.suffix
        )

    assert is_ndarray(image)
    return image

open_bioimageio_yaml ¤

open_bioimageio_yaml(
    source: Union[PermissiveFileSource, ZipFile, ZipPath],
    /,
    **kwargs: Unpack[HashKwargs],
) -> OpenedBioimageioYaml
Source code in src/bioimageio/spec/_internal/io_utils.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def open_bioimageio_yaml(
    source: Union[PermissiveFileSource, ZipFile, ZipPath],
    /,
    **kwargs: Unpack[HashKwargs],
) -> OpenedBioimageioYaml:
    if (
        isinstance(source, str)
        and source.startswith("huggingface/")
        and source.count("/") >= 2
    ):
        if source.count("/") == 2:
            # huggingface/{user_or_org}/{repo_name}
            repo_id = source[len("huggingface/") :]
            branch = "main"
        else:
            # huggingface/{user_or_org}/{repo_id}/
            # huggingface/{user_or_org}/{repo_id}/version
            repo_id, version = source[len("huggingface/") :].rsplit("/", 1)
            if len(version) == 0:
                branch = "main"
            elif version[0].isdigit():
                branch = f"v{version}"
            else:
                branch = version

        source = HttpUrl(
            settings.huggingface_http_pattern.format(repo_id=repo_id, branch=branch)
        )

    if isinstance(source, RelativeFilePath):
        source = source.absolute()

    if isinstance(source, ZipFile):
        return _open_bioimageio_zip(source, original_source_name=str(source))
    elif isinstance(source, ZipPath):
        return _open_bioimageio_rdf_in_zip(
            source, original_root=source.root, original_source_name=str(source)
        )

    try:
        if isinstance(source, (Path, str)) and (source_dir := Path(source)).is_dir():
            # open bioimageio yaml from a folder
            src = source_dir / find_bioimageio_yaml_file_name(source_dir)
        else:
            src = interprete_file_source(source)

        reader = get_reader(src, **kwargs)

    except Exception:
        # check if `source` is a collection id
        if (
            not isinstance(source, str)
            or not isinstance(settings.id_map, str)
            or "/" not in settings.id_map
        ):
            raise

        if settings.collection_http_pattern:
            with ValidationContext(perform_io_checks=False):
                url = HttpUrl(
                    settings.collection_http_pattern.format(bioimageio_id=source)
                )

            try:
                r = httpx.get(url, follow_redirects=True)
                _ = r.raise_for_status()
                unparsed_content = r.content.decode(encoding="utf-8")
                content = _sanitize_bioimageio_yaml(read_yaml(unparsed_content))
            except Exception as e:
                logger.warning("Failed to get bioimageio.yaml from {}: {}", url, e)
            else:
                logger.info("loaded {} from {}", source, url)
                original_file_name = (
                    "rdf.yaml" if url.path is None else url.path.split("/")[-1]
                )
                return OpenedBioimageioYaml(
                    content=content,
                    original_root=url.parent,
                    original_file_name=original_file_name,
                    original_source_name=url,
                    unparsed_content=unparsed_content,
                )

        id_map = get_id_map()
        if id_map and source not in id_map:
            close_matches = get_close_matches(source, id_map)
            if len(close_matches) == 0:
                raise

            if len(close_matches) == 1:
                did_you_mean = f" Did you mean '{close_matches[0]}'?"
            else:
                did_you_mean = f" Did you mean any of {close_matches}?"

            raise FileNotFoundError(f"'{source}' not found.{did_you_mean}")

        entry = id_map[source]
        logger.info("loading {} from {}", source, entry.source)
        reader = entry.get_reader()
        with get_validation_context().replace(perform_io_checks=False):
            src = HttpUrl(entry.source)

    if reader.is_zipfile:
        return _open_bioimageio_zip(ZipFile(reader), original_source_name=str(src))

    unparsed_content = reader.read().decode(encoding="utf-8")
    content = _sanitize_bioimageio_yaml(read_yaml(unparsed_content))

    if isinstance(src, RelativeFilePath):
        src = src.absolute()

    if isinstance(src, ZipPath):
        root = src.root
    else:
        root = src.parent

    return OpenedBioimageioYaml(
        content,
        original_root=root,
        original_source_name=str(src),
        original_file_name=extract_file_name(src),
        unparsed_content=unparsed_content,
    )

read_yaml ¤

read_yaml(
    file: Union[
        FilePath,
        ZipPath,
        IO[str],
        IO[bytes],
        BytesReader,
        str,
    ],
) -> YamlValue
Source code in src/bioimageio/spec/_internal/io_utils.py
53
54
55
56
57
58
59
60
61
62
def read_yaml(
    file: Union[FilePath, ZipPath, IO[str], IO[bytes], BytesReader, str],
) -> YamlValue:
    if isinstance(file, (ZipPath, Path)):
        data = file.read_text(encoding="utf-8")
    else:
        data = file

    content: YamlValue = _yaml_load.load(data)
    return content

save_array ¤

save_array(
    path: Union[Path, ZipPath], array: NDArray[Any]
) -> None

save a numpy ndarray to a .npy file

Source code in src/bioimageio/spec/_internal/io_utils.py
374
375
376
377
378
def save_array(path: Union[Path, ZipPath], array: NDArray[Any]) -> None:
    """save a numpy ndarray to a .npy file"""
    with path.open(mode="wb") as f:
        assert not isinstance(f, io.TextIOWrapper)
        return numpy.save(f, array, allow_pickle=False)

write_yaml ¤

write_yaml(
    content: Union[
        YamlValue, BioimageioYamlContentView, BaseModel
    ],
    /,
    file: Union[
        NewPath, FilePath, IO[str], IO[bytes], ZipPath
    ],
)
Source code in src/bioimageio/spec/_internal/io_utils.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def write_yaml(
    content: Union[YamlValue, BioimageioYamlContentView, BaseModel],
    /,
    file: Union[NewPath, FilePath, IO[str], IO[bytes], ZipPath],
):
    if isinstance(file, Path):
        cm = file.open("w", encoding="utf-8")
    else:
        cm = nullcontext(file)

    if isinstance(content, BaseModel):
        content = content.model_dump(mode="json")

    with cm as f:
        _yaml_dump.dump(content, f)