bioimageio.spec
1""" 2.. include:: ../../README.md 3""" 4 5from . import ( 6 application, 7 common, 8 conda_env, 9 dataset, 10 generic, 11 model, 12 pretty_validation_errors, 13 summary, 14 utils, 15) 16from ._description import ( 17 LatestResourceDescr, 18 ResourceDescr, 19 SpecificResourceDescr, 20 build_description, 21 dump_description, 22 validate_format, 23) 24from ._internal import settings 25from ._internal.common_nodes import InvalidDescr 26from ._internal.constants import VERSION 27from ._internal.validation_context import ValidationContext 28from ._io import ( 29 load_dataset_description, 30 load_description, 31 load_description_and_validate_format_only, 32 load_model_description, 33 save_bioimageio_yaml_only, 34) 35from ._package import ( 36 get_resource_package_content, 37 save_bioimageio_package, 38 save_bioimageio_package_as_folder, 39 save_bioimageio_package_to_stream, 40) 41from .application import AnyApplicationDescr, ApplicationDescr 42from .dataset import AnyDatasetDescr, DatasetDescr 43from .generic import AnyGenericDescr, GenericDescr 44from .model import AnyModelDescr, ModelDescr 45from .notebook import AnyNotebookDescr, NotebookDescr 46from .pretty_validation_errors import enable_pretty_validation_errors_in_ipynb 47from .summary import ValidationSummary 48 49__version__ = VERSION 50 51__all__ = [ 52 "__version__", 53 "AnyApplicationDescr", 54 "AnyDatasetDescr", 55 "AnyGenericDescr", 56 "AnyModelDescr", 57 "AnyNotebookDescr", 58 "application", 59 "ApplicationDescr", 60 "build_description", 61 "common", 62 "conda_env", 63 "dataset", 64 "DatasetDescr", 65 "dump_description", 66 "enable_pretty_validation_errors_in_ipynb", 67 "generic", 68 "GenericDescr", 69 "get_resource_package_content", 70 "InvalidDescr", 71 "LatestResourceDescr", 72 "load_dataset_description", 73 "load_description_and_validate_format_only", 74 "load_description", 75 "load_model_description", 76 "model", 77 "ModelDescr", 78 "NotebookDescr", 79 "pretty_validation_errors", 80 "ResourceDescr", 81 "save_bioimageio_package_as_folder", 82 "save_bioimageio_package_to_stream", 83 "save_bioimageio_package", 84 "save_bioimageio_yaml_only", 85 "settings", 86 "SpecificResourceDescr", 87 "summary", 88 "utils", 89 "validate_format", 90 "ValidationContext", 91 "ValidationSummary", 92]
32class ApplicationDescr(GenericDescrBase, title="bioimage.io application specification"): 33 """Bioimage.io description of an application.""" 34 35 type: Literal["application"] = "application" 36 37 id: Optional[ApplicationId] = None 38 """bioimage.io-wide unique resource identifier 39 assigned by bioimage.io; version **un**specific.""" 40 41 parent: Optional[ApplicationId] = None 42 """The description from which this one is derived""" 43 44 source: Annotated[ 45 Optional[ImportantFileSource], 46 Field(description="URL or path to the source of the application"), 47 ] = None 48 """The primary source of the application"""
Bioimage.io description of an application.
bioimage.io-wide unique resource identifier assigned by bioimage.io; version unspecific.
The description from which this one is derived
The primary source of the application
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
130def build_description( 131 content: BioimageioYamlContent, 132 /, 133 *, 134 context: Optional[ValidationContext] = None, 135 format_version: Union[FormatVersionPlaceholder, str] = DISCOVER, 136) -> Union[ResourceDescr, InvalidDescr]: 137 """build a bioimage.io resource description from an RDF's content. 138 139 Use `load_description` if you want to build a resource description from an rdf.yaml 140 or bioimage.io zip-package. 141 142 Args: 143 content: loaded rdf.yaml file (loaded with YAML, not bioimageio.spec) 144 context: validation context to use during validation 145 format_version: (optional) use this argument to load the resource and 146 convert its metadata to a higher format_version 147 148 Returns: 149 An object holding all metadata of the bioimage.io resource 150 151 """ 152 153 return build_description_impl( 154 content, 155 context=context, 156 format_version=format_version, 157 get_rd_class=_get_rd_class, 158 )
build a bioimage.io resource description from an RDF's content.
Use load_description
if you want to build a resource description from an rdf.yaml
or bioimage.io zip-package.
Arguments:
- content: loaded rdf.yaml file (loaded with YAML, not bioimageio.spec)
- context: validation context to use during validation
- format_version: (optional) use this argument to load the resource and convert its metadata to a higher format_version
Returns:
An object holding all metadata of the bioimage.io resource
39class DatasetDescr(GenericDescrBase, title="bioimage.io dataset specification"): 40 """A bioimage.io dataset resource description file (dataset RDF) describes a dataset relevant to bioimage 41 processing. 42 """ 43 44 type: Literal["dataset"] = "dataset" 45 46 id: Optional[DatasetId] = None 47 """bioimage.io-wide unique resource identifier 48 assigned by bioimage.io; version **un**specific.""" 49 50 parent: Optional[DatasetId] = None 51 """The description from which this one is derived""" 52 53 source: Optional[HttpUrl] = None 54 """"URL to the source of the dataset.""" 55 56 @model_validator(mode="before") 57 @classmethod 58 def _convert(cls, data: Dict[str, Any], /) -> Dict[str, Any]: 59 if ( 60 data.get("type") == "dataset" 61 and isinstance(fv := data.get("format_version"), str) 62 and fv.startswith("0.2.") 63 ): 64 old = DatasetDescr02.load(data) 65 if isinstance(old, InvalidDescr): 66 return data 67 68 return cast( 69 Dict[str, Any], 70 (cls if TYPE_CHECKING else dict)( 71 attachments=( 72 [] 73 if old.attachments is None 74 else [FileDescr(source=f) for f in old.attachments.files] 75 ), 76 authors=[ 77 _author_conv.convert_as_dict(a) for a in old.authors 78 ], # pyright: ignore[reportArgumentType] 79 badges=old.badges, 80 cite=[ 81 {"text": c.text, "doi": c.doi, "url": c.url} for c in old.cite 82 ], # pyright: ignore[reportArgumentType] 83 config=old.config, 84 covers=old.covers, 85 description=old.description, 86 documentation=cast(DocumentationSource, old.documentation), 87 format_version="0.3.0", 88 git_repo=old.git_repo, # pyright: ignore[reportArgumentType] 89 icon=old.icon, 90 id=None if old.id is None else DatasetId(old.id), 91 license=old.license, # type: ignore 92 links=old.links, 93 maintainers=[ 94 _maintainer_conv.convert_as_dict(m) for m in old.maintainers 95 ], # pyright: ignore[reportArgumentType] 96 name=old.name, 97 source=old.source, 98 tags=old.tags, 99 type=old.type, 100 uploader=old.uploader, 101 version=old.version, 102 **(old.model_extra or {}), 103 ), 104 ) 105 106 return data
A bioimage.io dataset resource description file (dataset RDF) describes a dataset relevant to bioimage processing.
bioimage.io-wide unique resource identifier assigned by bioimage.io; version unspecific.
The description from which this one is derived
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
65def dump_description( 66 rd: Union[ResourceDescr, InvalidDescr], exclude_unset: bool = True 67) -> BioimageioYamlContent: 68 """Converts a resource to a dictionary containing only simple types that can directly be serialzed to YAML.""" 69 return rd.model_dump(mode="json", exclude_unset=exclude_unset)
Converts a resource to a dictionary containing only simple types that can directly be serialzed to YAML.
72 def enable_pretty_validation_errors_in_ipynb(): 73 """A modestly hacky way to display prettified validaiton error messages and traceback 74 in interactive Python notebooks""" 75 ipy = get_ipython() 76 if ipy is not None: 77 ipy.set_custom_exc((ValidationError,), _custom_exception_handler)
A modestly hacky way to display prettified validaiton error messages and traceback in interactive Python notebooks
410class GenericDescr( 411 GenericDescrBase, extra="ignore", title="bioimage.io generic specification" 412): 413 """Specification of the fields used in a generic bioimage.io-compliant resource description file (RDF). 414 415 An RDF is a YAML file that describes a resource such as a model, a dataset, or a notebook. 416 Note that those resources are described with a type-specific RDF. 417 Use this generic resource description, if none of the known specific types matches your resource. 418 """ 419 420 type: Annotated[str, LowerCase] = Field("generic", frozen=True) 421 """The resource type assigns a broad category to the resource.""" 422 423 id: Optional[ResourceId] = None 424 """bioimage.io-wide unique resource identifier 425 assigned by bioimage.io; version **un**specific.""" 426 427 parent: Optional[ResourceId] = None 428 """The description from which this one is derived""" 429 430 source: Optional[HttpUrl] = None 431 """The primary source of the resource""" 432 433 @field_validator("type", mode="after") 434 @classmethod 435 def check_specific_types(cls, value: str) -> str: 436 if value in KNOWN_SPECIFIC_RESOURCE_TYPES: 437 raise ValueError( 438 f"Use the {value} description instead of this generic description for" 439 + f" your '{value}' resource." 440 ) 441 442 return value
Specification of the fields used in a generic bioimage.io-compliant resource description file (RDF).
An RDF is a YAML file that describes a resource such as a model, a dataset, or a notebook. Note that those resources are described with a type-specific RDF. Use this generic resource description, if none of the known specific types matches your resource.
The resource type assigns a broad category to the resource.
bioimage.io-wide unique resource identifier assigned by bioimage.io; version unspecific.
The description from which this one is derived
433 @field_validator("type", mode="after") 434 @classmethod 435 def check_specific_types(cls, value: str) -> str: 436 if value in KNOWN_SPECIFIC_RESOURCE_TYPES: 437 raise ValueError( 438 f"Use the {value} description instead of this generic description for" 439 + f" your '{value}' resource." 440 ) 441 442 return value
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
32def get_resource_package_content( 33 rd: ResourceDescr, 34 /, 35 *, 36 bioimageio_yaml_file_name: FileName = BIOIMAGEIO_YAML, 37 weights_priority_order: Optional[Sequence[WeightsFormat]] = None, # model only 38) -> Dict[FileName, Union[HttpUrl, AbsoluteFilePath, BioimageioYamlContent, ZipPath]]: 39 """ 40 Args: 41 rd: resource description 42 bioimageio_yaml_file_name: RDF file name 43 # for model resources only: 44 weights_priority_order: If given, only the first weights format present in the model is included. 45 If none of the prioritized weights formats is found a ValueError is raised. 46 """ 47 os_friendly_name = get_os_friendly_file_name(rd.name) 48 bioimageio_yaml_file_name = bioimageio_yaml_file_name.format( 49 name=os_friendly_name, type=rd.type 50 ) 51 52 bioimageio_yaml_file_name = ensure_is_valid_bioimageio_yaml_name( 53 bioimageio_yaml_file_name 54 ) 55 content: Dict[FileName, Union[HttpUrl, AbsoluteFilePath, ZipPath]] = {} 56 with PackagingContext( 57 bioimageio_yaml_file_name=bioimageio_yaml_file_name, 58 file_sources=content, 59 weights_priority_order=weights_priority_order, 60 ): 61 rdf_content: BioimageioYamlContent = rd.model_dump( 62 mode="json", exclude_unset=True 63 ) 64 65 _ = rdf_content.pop("rdf_source", None) 66 67 return {**content, bioimageio_yaml_file_name: rdf_content}
Arguments:
- rd: resource description
- bioimageio_yaml_file_name: RDF file name
- # for model resources only:
- weights_priority_order: If given, only the first weights format present in the model is included. If none of the prioritized weights formats is found a ValueError is raised.
514class InvalidDescr( 515 ResourceDescrBase, 516 extra="allow", 517 title="An invalid resource description", 518): 519 """A representation of an invalid resource description""" 520 521 type: Any = "unknown" 522 format_version: Any = "unknown" 523 fields_to_set_explicitly: ClassVar[FrozenSet[LiteralString]] = frozenset()
A representation of an invalid resource description
set set these fields explicitly with their default value if they are not set, such that they are always included even when dumping with 'exlude_unset'
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
98def load_dataset_description( 99 source: Union[PermissiveFileSource, ZipFile], 100 /, 101 *, 102 format_version: Union[Literal["discover"], Literal["latest"], str] = DISCOVER, 103 perform_io_checks: bool = settings.perform_io_checks, 104 known_files: Optional[Dict[str, Sha256]] = None, 105) -> AnyDatasetDescr: 106 """same as `load_description`, but addtionally ensures that the loaded 107 description is valid and of type 'dataset'. 108 """ 109 rd = load_description( 110 source, 111 format_version=format_version, 112 perform_io_checks=perform_io_checks, 113 known_files=known_files, 114 ) 115 return ensure_description_is_dataset(rd)
same as load_description
, but addtionally ensures that the loaded
description is valid and of type 'dataset'.
137def load_description_and_validate_format_only( 138 source: Union[PermissiveFileSource, ZipFile], 139 /, 140 *, 141 format_version: Union[Literal["discover"], Literal["latest"], str] = DISCOVER, 142 perform_io_checks: bool = settings.perform_io_checks, 143 known_files: Optional[Dict[str, Sha256]] = None, 144) -> ValidationSummary: 145 """load a bioimage.io resource description 146 147 Args: 148 source: Path or URL to an rdf.yaml or a bioimage.io package 149 (zip-file with rdf.yaml in it). 150 format_version: (optional) Use this argument to load the resource and 151 convert its metadata to a higher format_version. 152 perform_io_checks: Wether or not to perform validation that requires file io, 153 e.g. downloading a remote files. The existence of local 154 absolute file paths is still being checked. 155 known_files: Allows to bypass download and hashing of referenced files 156 (even if perform_io_checks is True). 157 158 Returns: 159 Validation summary of the bioimage.io resource found at `source`. 160 161 """ 162 rd = load_description( 163 source, 164 format_version=format_version, 165 perform_io_checks=perform_io_checks, 166 known_files=known_files, 167 ) 168 assert rd.validation_summary is not None 169 return rd.validation_summary
load a bioimage.io resource description
Arguments:
- source: Path or URL to an rdf.yaml or a bioimage.io package (zip-file with rdf.yaml in it).
- format_version: (optional) Use this argument to load the resource and convert its metadata to a higher format_version.
- perform_io_checks: Wether or not to perform validation that requires file io, e.g. downloading a remote files. The existence of local absolute file paths is still being checked.
- known_files: Allows to bypass download and hashing of referenced files (even if perform_io_checks is True).
Returns:
Validation summary of the bioimage.io resource found at
source
.
29def load_description( 30 source: Union[PermissiveFileSource, ZipFile], 31 /, 32 *, 33 format_version: Union[Literal["discover"], Literal["latest"], str] = DISCOVER, 34 perform_io_checks: bool = settings.perform_io_checks, 35 known_files: Optional[Dict[str, Sha256]] = None, 36) -> Union[ResourceDescr, InvalidDescr]: 37 """load a bioimage.io resource description 38 39 Args: 40 source: Path or URL to an rdf.yaml or a bioimage.io package 41 (zip-file with rdf.yaml in it). 42 format_version: (optional) Use this argument to load the resource and 43 convert its metadata to a higher format_version. 44 perform_io_checks: Wether or not to perform validation that requires file io, 45 e.g. downloading a remote files. The existence of local 46 absolute file paths is still being checked. 47 known_files: Allows to bypass download and hashing of referenced files 48 (even if perform_io_checks is True). 49 50 Returns: 51 An object holding all metadata of the bioimage.io resource 52 53 """ 54 if isinstance(source, ResourceDescrBase): 55 name = getattr(source, "name", f"{str(source)[:10]}...") 56 logger.warning("returning already loaded description '{}' as is", name) 57 return source # pyright: ignore[reportReturnType] 58 59 opened = open_bioimageio_yaml(source) 60 61 context = validation_context_var.get().replace( 62 root=opened.original_root, 63 file_name=opened.original_file_name, 64 perform_io_checks=perform_io_checks, 65 known_files=known_files, 66 ) 67 68 return build_description( 69 opened.content, 70 context=context, 71 format_version=format_version, 72 )
load a bioimage.io resource description
Arguments:
- source: Path or URL to an rdf.yaml or a bioimage.io package (zip-file with rdf.yaml in it).
- format_version: (optional) Use this argument to load the resource and convert its metadata to a higher format_version.
- perform_io_checks: Wether or not to perform validation that requires file io, e.g. downloading a remote files. The existence of local absolute file paths is still being checked.
- known_files: Allows to bypass download and hashing of referenced files (even if perform_io_checks is True).
Returns:
An object holding all metadata of the bioimage.io resource
75def load_model_description( 76 source: Union[PermissiveFileSource, ZipFile], 77 /, 78 *, 79 format_version: Union[Literal["discover"], Literal["latest"], str] = DISCOVER, 80 perform_io_checks: bool = settings.perform_io_checks, 81 known_files: Optional[Dict[str, Sha256]] = None, 82) -> AnyModelDescr: 83 """same as `load_description`, but addtionally ensures that the loaded 84 description is valid and of type 'model'. 85 86 Raises: 87 ValueError: for invalid or non-model resources 88 """ 89 rd = load_description( 90 source, 91 format_version=format_version, 92 perform_io_checks=perform_io_checks, 93 known_files=known_files, 94 ) 95 return ensure_description_is_model(rd)
same as load_description
, but addtionally ensures that the loaded
description is valid and of type 'model'.
Raises:
- ValueError: for invalid or non-model resources
2062class ModelDescr(GenericModelDescrBase, title="bioimage.io model specification"): 2063 """Specification of the fields used in a bioimage.io-compliant RDF to describe AI models with pretrained weights. 2064 These fields are typically stored in a YAML file which we call a model resource description file (model RDF). 2065 """ 2066 2067 format_version: Literal["0.5.3"] = "0.5.3" 2068 """Version of the bioimage.io model description specification used. 2069 When creating a new model always use the latest micro/patch version described here. 2070 The `format_version` is important for any consumer software to understand how to parse the fields. 2071 """ 2072 2073 type: Literal["model"] = "model" 2074 """Specialized resource type 'model'""" 2075 2076 id: Optional[ModelId] = None 2077 """bioimage.io-wide unique resource identifier 2078 assigned by bioimage.io; version **un**specific.""" 2079 2080 authors: NotEmpty[List[Author]] 2081 """The authors are the creators of the model RDF and the primary points of contact.""" 2082 2083 documentation: Annotated[ 2084 DocumentationSource, 2085 Field( 2086 examples=[ 2087 "https://raw.githubusercontent.com/bioimage-io/spec-bioimage-io/main/example_descriptions/models/unet2d_nuclei_broad/README.md", 2088 "README.md", 2089 ], 2090 ), 2091 ] 2092 """∈📦 URL or relative path to a markdown file with additional documentation. 2093 The recommended documentation file name is `README.md`. An `.md` suffix is mandatory. 2094 The documentation should include a '#[#] Validation' (sub)section 2095 with details on how to quantitatively validate the model on unseen data.""" 2096 2097 @field_validator("documentation", mode="after") 2098 @classmethod 2099 def _validate_documentation(cls, value: DocumentationSource) -> DocumentationSource: 2100 if not validation_context_var.get().perform_io_checks: 2101 return value 2102 2103 doc_path = download(value).path 2104 doc_content = doc_path.read_text(encoding="utf-8") 2105 assert isinstance(doc_content, str) 2106 if not re.match("#.*[vV]alidation", doc_content): 2107 issue_warning( 2108 "No '# Validation' (sub)section found in {value}.", 2109 value=value, 2110 field="documentation", 2111 ) 2112 2113 return value 2114 2115 inputs: NotEmpty[Sequence[InputTensorDescr]] 2116 """Describes the input tensors expected by this model.""" 2117 2118 @field_validator("inputs", mode="after") 2119 @classmethod 2120 def _validate_input_axes( 2121 cls, inputs: Sequence[InputTensorDescr] 2122 ) -> Sequence[InputTensorDescr]: 2123 input_size_refs = cls._get_axes_with_independent_size(inputs) 2124 2125 for i, ipt in enumerate(inputs): 2126 valid_independent_refs: Dict[ 2127 Tuple[TensorId, AxisId], 2128 Tuple[TensorDescr, AnyAxis, Union[int, ParameterizedSize]], 2129 ] = { 2130 **{ 2131 (ipt.id, a.id): (ipt, a, a.size) 2132 for a in ipt.axes 2133 if not isinstance(a, BatchAxis) 2134 and isinstance(a.size, (int, ParameterizedSize)) 2135 }, 2136 **input_size_refs, 2137 } 2138 for a, ax in enumerate(ipt.axes): 2139 cls._validate_axis( 2140 "inputs", 2141 i=i, 2142 tensor_id=ipt.id, 2143 a=a, 2144 axis=ax, 2145 valid_independent_refs=valid_independent_refs, 2146 ) 2147 return inputs 2148 2149 @staticmethod 2150 def _validate_axis( 2151 field_name: str, 2152 i: int, 2153 tensor_id: TensorId, 2154 a: int, 2155 axis: AnyAxis, 2156 valid_independent_refs: Dict[ 2157 Tuple[TensorId, AxisId], 2158 Tuple[TensorDescr, AnyAxis, Union[int, ParameterizedSize]], 2159 ], 2160 ): 2161 if isinstance(axis, BatchAxis) or isinstance( 2162 axis.size, (int, ParameterizedSize, DataDependentSize) 2163 ): 2164 return 2165 elif not isinstance(axis.size, SizeReference): 2166 assert_never(axis.size) 2167 2168 # validate axis.size SizeReference 2169 ref = (axis.size.tensor_id, axis.size.axis_id) 2170 if ref not in valid_independent_refs: 2171 raise ValueError( 2172 "Invalid tensor axis reference at" 2173 + f" {field_name}[{i}].axes[{a}].size: {axis.size}." 2174 ) 2175 if ref == (tensor_id, axis.id): 2176 raise ValueError( 2177 "Self-referencing not allowed for" 2178 + f" {field_name}[{i}].axes[{a}].size: {axis.size}" 2179 ) 2180 if axis.type == "channel": 2181 if valid_independent_refs[ref][1].type != "channel": 2182 raise ValueError( 2183 "A channel axis' size may only reference another fixed size" 2184 + " channel axis." 2185 ) 2186 if isinstance(axis.channel_names, str) and "{i}" in axis.channel_names: 2187 ref_size = valid_independent_refs[ref][2] 2188 assert isinstance(ref_size, int), ( 2189 "channel axis ref (another channel axis) has to specify fixed" 2190 + " size" 2191 ) 2192 generated_channel_names = [ 2193 Identifier(axis.channel_names.format(i=i)) 2194 for i in range(1, ref_size + 1) 2195 ] 2196 axis.channel_names = generated_channel_names 2197 2198 if (ax_unit := getattr(axis, "unit", None)) != ( 2199 ref_unit := getattr(valid_independent_refs[ref][1], "unit", None) 2200 ): 2201 raise ValueError( 2202 "The units of an axis and its reference axis need to match, but" 2203 + f" '{ax_unit}' != '{ref_unit}'." 2204 ) 2205 ref_axis = valid_independent_refs[ref][1] 2206 if isinstance(ref_axis, BatchAxis): 2207 raise ValueError( 2208 f"Invalid reference axis '{ref_axis.id}' for {tensor_id}.{axis.id}" 2209 + " (a batch axis is not allowed as reference)." 2210 ) 2211 2212 if isinstance(axis, WithHalo): 2213 min_size = axis.size.get_size(axis, ref_axis, n=0) 2214 if (min_size - 2 * axis.halo) < 1: 2215 raise ValueError( 2216 f"axis {axis.id} with minimum size {min_size} is too small for halo" 2217 + f" {axis.halo}." 2218 ) 2219 2220 input_halo = axis.halo * axis.scale / ref_axis.scale 2221 if input_halo != int(input_halo) or input_halo % 2 == 1: 2222 raise ValueError( 2223 f"input_halo {input_halo} (output_halo {axis.halo} *" 2224 + f" output_scale {axis.scale} / input_scale {ref_axis.scale})" 2225 + f" is not an even integer for {tensor_id}.{axis.id}." 2226 ) 2227 2228 @model_validator(mode="after") 2229 def _validate_test_tensors(self) -> Self: 2230 if not validation_context_var.get().perform_io_checks: 2231 return self 2232 2233 test_arrays = [ 2234 load_array(descr.test_tensor.download().path) 2235 for descr in chain(self.inputs, self.outputs) 2236 ] 2237 tensors = { 2238 descr.id: (descr, array) 2239 for descr, array in zip(chain(self.inputs, self.outputs), test_arrays) 2240 } 2241 validate_tensors(tensors, tensor_origin="test_tensor") 2242 return self 2243 2244 @model_validator(mode="after") 2245 def _validate_tensor_references_in_proc_kwargs(self, info: ValidationInfo) -> Self: 2246 ipt_refs = {t.id for t in self.inputs} 2247 out_refs = {t.id for t in self.outputs} 2248 for ipt in self.inputs: 2249 for p in ipt.preprocessing: 2250 ref = p.kwargs.get("reference_tensor") 2251 if ref is None: 2252 continue 2253 if ref not in ipt_refs: 2254 raise ValueError( 2255 f"`reference_tensor` '{ref}' not found. Valid input tensor" 2256 + f" references are: {ipt_refs}." 2257 ) 2258 2259 for out in self.outputs: 2260 for p in out.postprocessing: 2261 ref = p.kwargs.get("reference_tensor") 2262 if ref is None: 2263 continue 2264 2265 if ref not in ipt_refs and ref not in out_refs: 2266 raise ValueError( 2267 f"`reference_tensor` '{ref}' not found. Valid tensor references" 2268 + f" are: {ipt_refs | out_refs}." 2269 ) 2270 2271 return self 2272 2273 # TODO: use validate funcs in validate_test_tensors 2274 # def validate_inputs(self, input_tensors: Mapping[TensorId, NDArray[Any]]) -> Mapping[TensorId, NDArray[Any]]: 2275 2276 name: Annotated[ 2277 Annotated[ 2278 str, RestrictCharacters(string.ascii_letters + string.digits + "_- ()") 2279 ], 2280 MinLen(5), 2281 MaxLen(128), 2282 warn(MaxLen(64), "Name longer than 64 characters.", INFO), 2283 ] 2284 """A human-readable name of this model. 2285 It should be no longer than 64 characters 2286 and may only contain letter, number, underscore, minus, parentheses and spaces. 2287 We recommend to chose a name that refers to the model's task and image modality. 2288 """ 2289 2290 outputs: NotEmpty[Sequence[OutputTensorDescr]] 2291 """Describes the output tensors.""" 2292 2293 @field_validator("outputs", mode="after") 2294 @classmethod 2295 def _validate_tensor_ids( 2296 cls, outputs: Sequence[OutputTensorDescr], info: ValidationInfo 2297 ) -> Sequence[OutputTensorDescr]: 2298 tensor_ids = [ 2299 t.id for t in info.data.get("inputs", []) + info.data.get("outputs", []) 2300 ] 2301 duplicate_tensor_ids: List[str] = [] 2302 seen: Set[str] = set() 2303 for t in tensor_ids: 2304 if t in seen: 2305 duplicate_tensor_ids.append(t) 2306 2307 seen.add(t) 2308 2309 if duplicate_tensor_ids: 2310 raise ValueError(f"Duplicate tensor ids: {duplicate_tensor_ids}") 2311 2312 return outputs 2313 2314 @staticmethod 2315 def _get_axes_with_parameterized_size( 2316 io: Union[Sequence[InputTensorDescr], Sequence[OutputTensorDescr]], 2317 ): 2318 return { 2319 f"{t.id}.{a.id}": (t, a, a.size) 2320 for t in io 2321 for a in t.axes 2322 if not isinstance(a, BatchAxis) and isinstance(a.size, ParameterizedSize) 2323 } 2324 2325 @staticmethod 2326 def _get_axes_with_independent_size( 2327 io: Union[Sequence[InputTensorDescr], Sequence[OutputTensorDescr]], 2328 ): 2329 return { 2330 (t.id, a.id): (t, a, a.size) 2331 for t in io 2332 for a in t.axes 2333 if not isinstance(a, BatchAxis) 2334 and isinstance(a.size, (int, ParameterizedSize)) 2335 } 2336 2337 @field_validator("outputs", mode="after") 2338 @classmethod 2339 def _validate_output_axes( 2340 cls, outputs: List[OutputTensorDescr], info: ValidationInfo 2341 ) -> List[OutputTensorDescr]: 2342 input_size_refs = cls._get_axes_with_independent_size( 2343 info.data.get("inputs", []) 2344 ) 2345 output_size_refs = cls._get_axes_with_independent_size(outputs) 2346 2347 for i, out in enumerate(outputs): 2348 valid_independent_refs: Dict[ 2349 Tuple[TensorId, AxisId], 2350 Tuple[TensorDescr, AnyAxis, Union[int, ParameterizedSize]], 2351 ] = { 2352 **{ 2353 (out.id, a.id): (out, a, a.size) 2354 for a in out.axes 2355 if not isinstance(a, BatchAxis) 2356 and isinstance(a.size, (int, ParameterizedSize)) 2357 }, 2358 **input_size_refs, 2359 **output_size_refs, 2360 } 2361 for a, ax in enumerate(out.axes): 2362 cls._validate_axis( 2363 "outputs", 2364 i, 2365 out.id, 2366 a, 2367 ax, 2368 valid_independent_refs=valid_independent_refs, 2369 ) 2370 2371 return outputs 2372 2373 packaged_by: List[Author] = Field(default_factory=list) 2374 """The persons that have packaged and uploaded this model. 2375 Only required if those persons differ from the `authors`.""" 2376 2377 parent: Optional[LinkedModel] = None 2378 """The model from which this model is derived, e.g. by fine-tuning the weights.""" 2379 2380 # todo: add parent self check once we have `id` 2381 # @model_validator(mode="after") 2382 # def validate_parent_is_not_self(self) -> Self: 2383 # if self.parent is not None and self.parent == self.id: 2384 # raise ValueError("The model may not reference itself as parent model") 2385 2386 # return self 2387 2388 run_mode: Annotated[ 2389 Optional[RunMode], 2390 warn(None, "Run mode '{value}' has limited support across consumer softwares."), 2391 ] = None 2392 """Custom run mode for this model: for more complex prediction procedures like test time 2393 data augmentation that currently cannot be expressed in the specification. 2394 No standard run modes are defined yet.""" 2395 2396 timestamp: Datetime = Datetime(datetime.now()) 2397 """Timestamp in [ISO 8601](#https://en.wikipedia.org/wiki/ISO_8601) format 2398 with a few restrictions listed [here](https://docs.python.org/3/library/datetime.html#datetime.datetime.fromisoformat). 2399 (In Python a datetime object is valid, too).""" 2400 2401 training_data: Annotated[ 2402 Union[None, LinkedDataset, DatasetDescr, DatasetDescr02], 2403 Field(union_mode="left_to_right"), 2404 ] = None 2405 """The dataset used to train this model""" 2406 2407 weights: Annotated[WeightsDescr, WrapSerializer(package_weights)] 2408 """The weights for this model. 2409 Weights can be given for different formats, but should otherwise be equivalent. 2410 The available weight formats determine which consumers can use this model.""" 2411 2412 @model_validator(mode="after") 2413 def _add_default_cover(self) -> Self: 2414 if not validation_context_var.get().perform_io_checks or self.covers: 2415 return self 2416 2417 try: 2418 generated_covers = generate_covers( 2419 [(t, load_array(t.test_tensor.download().path)) for t in self.inputs], 2420 [(t, load_array(t.test_tensor.download().path)) for t in self.outputs], 2421 ) 2422 except Exception as e: 2423 issue_warning( 2424 "Failed to generate cover image(s): {e}", 2425 value=self.covers, 2426 msg_context=dict(e=e), 2427 field="covers", 2428 ) 2429 else: 2430 self.covers.extend(generated_covers) 2431 2432 return self 2433 2434 def get_input_test_arrays(self) -> List[NDArray[Any]]: 2435 data = [load_array(ipt.test_tensor.download().path) for ipt in self.inputs] 2436 assert all(isinstance(d, np.ndarray) for d in data) 2437 return data 2438 2439 def get_output_test_arrays(self) -> List[NDArray[Any]]: 2440 data = [load_array(out.test_tensor.download().path) for out in self.outputs] 2441 assert all(isinstance(d, np.ndarray) for d in data) 2442 return data 2443 2444 @staticmethod 2445 def get_batch_size(tensor_sizes: Mapping[TensorId, Mapping[AxisId, int]]) -> int: 2446 batch_size = 1 2447 tensor_with_batchsize: Optional[TensorId] = None 2448 for tid in tensor_sizes: 2449 for aid, s in tensor_sizes[tid].items(): 2450 if aid != BATCH_AXIS_ID or s == 1 or s == batch_size: 2451 continue 2452 2453 if batch_size != 1: 2454 assert tensor_with_batchsize is not None 2455 raise ValueError( 2456 f"batch size mismatch for tensors '{tensor_with_batchsize}' ({batch_size}) and '{tid}' ({s})" 2457 ) 2458 2459 batch_size = s 2460 tensor_with_batchsize = tid 2461 2462 return batch_size 2463 2464 def get_output_tensor_sizes( 2465 self, input_sizes: Mapping[TensorId, Mapping[AxisId, int]] 2466 ) -> Dict[TensorId, Dict[AxisId, Union[int, _DataDepSize]]]: 2467 """Returns the tensor output sizes for given **input_sizes**. 2468 Only if **input_sizes** has a valid input shape, the tensor output size is exact. 2469 Otherwise it might be larger than the actual (valid) output""" 2470 batch_size = self.get_batch_size(input_sizes) 2471 ns = self.get_ns(input_sizes) 2472 2473 tensor_sizes = self.get_tensor_sizes(ns, batch_size=batch_size) 2474 return tensor_sizes.outputs 2475 2476 def get_ns(self, input_sizes: Mapping[TensorId, Mapping[AxisId, int]]): 2477 """get parameter `n` for each parameterized axis 2478 such that the valid input size is >= the given input size""" 2479 ret: Dict[Tuple[TensorId, AxisId], ParameterizedSize_N] = {} 2480 axes = {t.id: {a.id: a for a in t.axes} for t in self.inputs} 2481 for tid in input_sizes: 2482 for aid, s in input_sizes[tid].items(): 2483 size_descr = axes[tid][aid].size 2484 if isinstance(size_descr, ParameterizedSize): 2485 ret[(tid, aid)] = size_descr.get_n(s) 2486 elif size_descr is None or isinstance(size_descr, (int, SizeReference)): 2487 pass 2488 else: 2489 assert_never(size_descr) 2490 2491 return ret 2492 2493 def get_tensor_sizes( 2494 self, ns: Mapping[Tuple[TensorId, AxisId], ParameterizedSize_N], batch_size: int 2495 ) -> _TensorSizes: 2496 axis_sizes = self.get_axis_sizes(ns, batch_size=batch_size) 2497 return _TensorSizes( 2498 { 2499 t: { 2500 aa: axis_sizes.inputs[(tt, aa)] 2501 for tt, aa in axis_sizes.inputs 2502 if tt == t 2503 } 2504 for t in {tt for tt, _ in axis_sizes.inputs} 2505 }, 2506 { 2507 t: { 2508 aa: axis_sizes.outputs[(tt, aa)] 2509 for tt, aa in axis_sizes.outputs 2510 if tt == t 2511 } 2512 for t in {tt for tt, _ in axis_sizes.outputs} 2513 }, 2514 ) 2515 2516 def get_axis_sizes( 2517 self, 2518 ns: Mapping[Tuple[TensorId, AxisId], ParameterizedSize_N], 2519 batch_size: Optional[int] = None, 2520 *, 2521 max_input_shape: Optional[Mapping[Tuple[TensorId, AxisId], int]] = None, 2522 ) -> _AxisSizes: 2523 """Determine input and output block shape for scale factors **ns** 2524 of parameterized input sizes. 2525 2526 Args: 2527 ns: Scale factor `n` for each axis (keyed by (tensor_id, axis_id)) 2528 that is parameterized as `size = min + n * step`. 2529 batch_size: The desired size of the batch dimension. 2530 If given **batch_size** overwrites any batch size present in 2531 **max_input_shape**. Default 1. 2532 max_input_shape: Limits the derived block shapes. 2533 Each axis for which the input size, parameterized by `n`, is larger 2534 than **max_input_shape** is set to the minimal value `n_min` for which 2535 this is still true. 2536 Use this for small input samples or large values of **ns**. 2537 Or simply whenever you know the full input shape. 2538 2539 Returns: 2540 Resolved axis sizes for model inputs and outputs. 2541 """ 2542 max_input_shape = max_input_shape or {} 2543 if batch_size is None: 2544 for (_t_id, a_id), s in max_input_shape.items(): 2545 if a_id == BATCH_AXIS_ID: 2546 batch_size = s 2547 break 2548 else: 2549 batch_size = 1 2550 2551 all_axes = { 2552 t.id: {a.id: a for a in t.axes} for t in chain(self.inputs, self.outputs) 2553 } 2554 2555 inputs: Dict[Tuple[TensorId, AxisId], int] = {} 2556 outputs: Dict[Tuple[TensorId, AxisId], Union[int, _DataDepSize]] = {} 2557 2558 def get_axis_size(a: Union[InputAxis, OutputAxis]): 2559 if isinstance(a, BatchAxis): 2560 if (t_descr.id, a.id) in ns: 2561 logger.warning( 2562 "Ignoring unexpected size increment factor (n) for batch axis" 2563 + " of tensor '{}'.", 2564 t_descr.id, 2565 ) 2566 return batch_size 2567 elif isinstance(a.size, int): 2568 if (t_descr.id, a.id) in ns: 2569 logger.warning( 2570 "Ignoring unexpected size increment factor (n) for fixed size" 2571 + " axis '{}' of tensor '{}'.", 2572 a.id, 2573 t_descr.id, 2574 ) 2575 return a.size 2576 elif isinstance(a.size, ParameterizedSize): 2577 if (t_descr.id, a.id) not in ns: 2578 raise ValueError( 2579 "Size increment factor (n) missing for parametrized axis" 2580 + f" '{a.id}' of tensor '{t_descr.id}'." 2581 ) 2582 n = ns[(t_descr.id, a.id)] 2583 s_max = max_input_shape.get((t_descr.id, a.id)) 2584 if s_max is not None: 2585 n = min(n, a.size.get_n(s_max)) 2586 2587 return a.size.get_size(n) 2588 2589 elif isinstance(a.size, SizeReference): 2590 if (t_descr.id, a.id) in ns: 2591 logger.warning( 2592 "Ignoring unexpected size increment factor (n) for axis '{}'" 2593 + " of tensor '{}' with size reference.", 2594 a.id, 2595 t_descr.id, 2596 ) 2597 assert not isinstance(a, BatchAxis) 2598 ref_axis = all_axes[a.size.tensor_id][a.size.axis_id] 2599 assert not isinstance(ref_axis, BatchAxis) 2600 ref_key = (a.size.tensor_id, a.size.axis_id) 2601 ref_size = inputs.get(ref_key, outputs.get(ref_key)) 2602 assert ref_size is not None, ref_key 2603 assert not isinstance(ref_size, _DataDepSize), ref_key 2604 return a.size.get_size( 2605 axis=a, 2606 ref_axis=ref_axis, 2607 ref_size=ref_size, 2608 ) 2609 elif isinstance(a.size, DataDependentSize): 2610 if (t_descr.id, a.id) in ns: 2611 logger.warning( 2612 "Ignoring unexpected increment factor (n) for data dependent" 2613 + " size axis '{}' of tensor '{}'.", 2614 a.id, 2615 t_descr.id, 2616 ) 2617 return _DataDepSize(a.size.min, a.size.max) 2618 else: 2619 assert_never(a.size) 2620 2621 # first resolve all , but the `SizeReference` input sizes 2622 for t_descr in self.inputs: 2623 for a in t_descr.axes: 2624 if not isinstance(a.size, SizeReference): 2625 s = get_axis_size(a) 2626 assert not isinstance(s, _DataDepSize) 2627 inputs[t_descr.id, a.id] = s 2628 2629 # resolve all other input axis sizes 2630 for t_descr in self.inputs: 2631 for a in t_descr.axes: 2632 if isinstance(a.size, SizeReference): 2633 s = get_axis_size(a) 2634 assert not isinstance(s, _DataDepSize) 2635 inputs[t_descr.id, a.id] = s 2636 2637 # resolve all output axis sizes 2638 for t_descr in self.outputs: 2639 for a in t_descr.axes: 2640 assert not isinstance(a.size, ParameterizedSize) 2641 s = get_axis_size(a) 2642 outputs[t_descr.id, a.id] = s 2643 2644 return _AxisSizes(inputs=inputs, outputs=outputs) 2645 2646 @model_validator(mode="before") 2647 @classmethod 2648 def _convert(cls, data: Dict[str, Any]) -> Dict[str, Any]: 2649 if ( 2650 data.get("type") == "model" 2651 and isinstance(fv := data.get("format_version"), str) 2652 and fv.count(".") == 2 2653 ): 2654 fv_parts = fv.split(".") 2655 if any(not p.isdigit() for p in fv_parts): 2656 return data 2657 2658 fv_tuple = tuple(map(int, fv_parts)) 2659 2660 assert cls.implemented_format_version_tuple[0:2] == (0, 5) 2661 if fv_tuple[:2] in ((0, 3), (0, 4)): 2662 m04 = _ModelDescr_v0_4.load(data) 2663 if not isinstance(m04, InvalidDescr): 2664 return _model_conv.convert_as_dict(m04) 2665 elif fv_tuple[:2] == (0, 5): 2666 # bump patch version 2667 data["format_version"] = cls.implemented_format_version 2668 2669 return data
Specification of the fields used in a bioimage.io-compliant RDF to describe AI models with pretrained weights. These fields are typically stored in a YAML file which we call a model resource description file (model RDF).
Version of the bioimage.io model description specification used.
When creating a new model always use the latest micro/patch version described here.
The format_version
is important for any consumer software to understand how to parse the fields.
bioimage.io-wide unique resource identifier assigned by bioimage.io; version unspecific.
∈📦 URL or relative path to a markdown file with additional documentation.
The recommended documentation file name is README.md
. An .md
suffix is mandatory.
The documentation should include a '#[#] Validation' (sub)section
with details on how to quantitatively validate the model on unseen data.
Describes the input tensors expected by this model.
A human-readable name of this model. It should be no longer than 64 characters and may only contain letter, number, underscore, minus, parentheses and spaces. We recommend to chose a name that refers to the model's task and image modality.
Describes the output tensors.
The persons that have packaged and uploaded this model.
Only required if those persons differ from the authors
.
The model from which this model is derived, e.g. by fine-tuning the weights.
Custom run mode for this model: for more complex prediction procedures like test time data augmentation that currently cannot be expressed in the specification. No standard run modes are defined yet.
The dataset used to train this model
The weights for this model. Weights can be given for different formats, but should otherwise be equivalent. The available weight formats determine which consumers can use this model.
2444 @staticmethod 2445 def get_batch_size(tensor_sizes: Mapping[TensorId, Mapping[AxisId, int]]) -> int: 2446 batch_size = 1 2447 tensor_with_batchsize: Optional[TensorId] = None 2448 for tid in tensor_sizes: 2449 for aid, s in tensor_sizes[tid].items(): 2450 if aid != BATCH_AXIS_ID or s == 1 or s == batch_size: 2451 continue 2452 2453 if batch_size != 1: 2454 assert tensor_with_batchsize is not None 2455 raise ValueError( 2456 f"batch size mismatch for tensors '{tensor_with_batchsize}' ({batch_size}) and '{tid}' ({s})" 2457 ) 2458 2459 batch_size = s 2460 tensor_with_batchsize = tid 2461 2462 return batch_size
2464 def get_output_tensor_sizes( 2465 self, input_sizes: Mapping[TensorId, Mapping[AxisId, int]] 2466 ) -> Dict[TensorId, Dict[AxisId, Union[int, _DataDepSize]]]: 2467 """Returns the tensor output sizes for given **input_sizes**. 2468 Only if **input_sizes** has a valid input shape, the tensor output size is exact. 2469 Otherwise it might be larger than the actual (valid) output""" 2470 batch_size = self.get_batch_size(input_sizes) 2471 ns = self.get_ns(input_sizes) 2472 2473 tensor_sizes = self.get_tensor_sizes(ns, batch_size=batch_size) 2474 return tensor_sizes.outputs
Returns the tensor output sizes for given input_sizes. Only if input_sizes has a valid input shape, the tensor output size is exact. Otherwise it might be larger than the actual (valid) output
2476 def get_ns(self, input_sizes: Mapping[TensorId, Mapping[AxisId, int]]): 2477 """get parameter `n` for each parameterized axis 2478 such that the valid input size is >= the given input size""" 2479 ret: Dict[Tuple[TensorId, AxisId], ParameterizedSize_N] = {} 2480 axes = {t.id: {a.id: a for a in t.axes} for t in self.inputs} 2481 for tid in input_sizes: 2482 for aid, s in input_sizes[tid].items(): 2483 size_descr = axes[tid][aid].size 2484 if isinstance(size_descr, ParameterizedSize): 2485 ret[(tid, aid)] = size_descr.get_n(s) 2486 elif size_descr is None or isinstance(size_descr, (int, SizeReference)): 2487 pass 2488 else: 2489 assert_never(size_descr) 2490 2491 return ret
get parameter n
for each parameterized axis
such that the valid input size is >= the given input size
2493 def get_tensor_sizes( 2494 self, ns: Mapping[Tuple[TensorId, AxisId], ParameterizedSize_N], batch_size: int 2495 ) -> _TensorSizes: 2496 axis_sizes = self.get_axis_sizes(ns, batch_size=batch_size) 2497 return _TensorSizes( 2498 { 2499 t: { 2500 aa: axis_sizes.inputs[(tt, aa)] 2501 for tt, aa in axis_sizes.inputs 2502 if tt == t 2503 } 2504 for t in {tt for tt, _ in axis_sizes.inputs} 2505 }, 2506 { 2507 t: { 2508 aa: axis_sizes.outputs[(tt, aa)] 2509 for tt, aa in axis_sizes.outputs 2510 if tt == t 2511 } 2512 for t in {tt for tt, _ in axis_sizes.outputs} 2513 }, 2514 )
2516 def get_axis_sizes( 2517 self, 2518 ns: Mapping[Tuple[TensorId, AxisId], ParameterizedSize_N], 2519 batch_size: Optional[int] = None, 2520 *, 2521 max_input_shape: Optional[Mapping[Tuple[TensorId, AxisId], int]] = None, 2522 ) -> _AxisSizes: 2523 """Determine input and output block shape for scale factors **ns** 2524 of parameterized input sizes. 2525 2526 Args: 2527 ns: Scale factor `n` for each axis (keyed by (tensor_id, axis_id)) 2528 that is parameterized as `size = min + n * step`. 2529 batch_size: The desired size of the batch dimension. 2530 If given **batch_size** overwrites any batch size present in 2531 **max_input_shape**. Default 1. 2532 max_input_shape: Limits the derived block shapes. 2533 Each axis for which the input size, parameterized by `n`, is larger 2534 than **max_input_shape** is set to the minimal value `n_min` for which 2535 this is still true. 2536 Use this for small input samples or large values of **ns**. 2537 Or simply whenever you know the full input shape. 2538 2539 Returns: 2540 Resolved axis sizes for model inputs and outputs. 2541 """ 2542 max_input_shape = max_input_shape or {} 2543 if batch_size is None: 2544 for (_t_id, a_id), s in max_input_shape.items(): 2545 if a_id == BATCH_AXIS_ID: 2546 batch_size = s 2547 break 2548 else: 2549 batch_size = 1 2550 2551 all_axes = { 2552 t.id: {a.id: a for a in t.axes} for t in chain(self.inputs, self.outputs) 2553 } 2554 2555 inputs: Dict[Tuple[TensorId, AxisId], int] = {} 2556 outputs: Dict[Tuple[TensorId, AxisId], Union[int, _DataDepSize]] = {} 2557 2558 def get_axis_size(a: Union[InputAxis, OutputAxis]): 2559 if isinstance(a, BatchAxis): 2560 if (t_descr.id, a.id) in ns: 2561 logger.warning( 2562 "Ignoring unexpected size increment factor (n) for batch axis" 2563 + " of tensor '{}'.", 2564 t_descr.id, 2565 ) 2566 return batch_size 2567 elif isinstance(a.size, int): 2568 if (t_descr.id, a.id) in ns: 2569 logger.warning( 2570 "Ignoring unexpected size increment factor (n) for fixed size" 2571 + " axis '{}' of tensor '{}'.", 2572 a.id, 2573 t_descr.id, 2574 ) 2575 return a.size 2576 elif isinstance(a.size, ParameterizedSize): 2577 if (t_descr.id, a.id) not in ns: 2578 raise ValueError( 2579 "Size increment factor (n) missing for parametrized axis" 2580 + f" '{a.id}' of tensor '{t_descr.id}'." 2581 ) 2582 n = ns[(t_descr.id, a.id)] 2583 s_max = max_input_shape.get((t_descr.id, a.id)) 2584 if s_max is not None: 2585 n = min(n, a.size.get_n(s_max)) 2586 2587 return a.size.get_size(n) 2588 2589 elif isinstance(a.size, SizeReference): 2590 if (t_descr.id, a.id) in ns: 2591 logger.warning( 2592 "Ignoring unexpected size increment factor (n) for axis '{}'" 2593 + " of tensor '{}' with size reference.", 2594 a.id, 2595 t_descr.id, 2596 ) 2597 assert not isinstance(a, BatchAxis) 2598 ref_axis = all_axes[a.size.tensor_id][a.size.axis_id] 2599 assert not isinstance(ref_axis, BatchAxis) 2600 ref_key = (a.size.tensor_id, a.size.axis_id) 2601 ref_size = inputs.get(ref_key, outputs.get(ref_key)) 2602 assert ref_size is not None, ref_key 2603 assert not isinstance(ref_size, _DataDepSize), ref_key 2604 return a.size.get_size( 2605 axis=a, 2606 ref_axis=ref_axis, 2607 ref_size=ref_size, 2608 ) 2609 elif isinstance(a.size, DataDependentSize): 2610 if (t_descr.id, a.id) in ns: 2611 logger.warning( 2612 "Ignoring unexpected increment factor (n) for data dependent" 2613 + " size axis '{}' of tensor '{}'.", 2614 a.id, 2615 t_descr.id, 2616 ) 2617 return _DataDepSize(a.size.min, a.size.max) 2618 else: 2619 assert_never(a.size) 2620 2621 # first resolve all , but the `SizeReference` input sizes 2622 for t_descr in self.inputs: 2623 for a in t_descr.axes: 2624 if not isinstance(a.size, SizeReference): 2625 s = get_axis_size(a) 2626 assert not isinstance(s, _DataDepSize) 2627 inputs[t_descr.id, a.id] = s 2628 2629 # resolve all other input axis sizes 2630 for t_descr in self.inputs: 2631 for a in t_descr.axes: 2632 if isinstance(a.size, SizeReference): 2633 s = get_axis_size(a) 2634 assert not isinstance(s, _DataDepSize) 2635 inputs[t_descr.id, a.id] = s 2636 2637 # resolve all output axis sizes 2638 for t_descr in self.outputs: 2639 for a in t_descr.axes: 2640 assert not isinstance(a.size, ParameterizedSize) 2641 s = get_axis_size(a) 2642 outputs[t_descr.id, a.id] = s 2643 2644 return _AxisSizes(inputs=inputs, outputs=outputs)
Determine input and output block shape for scale factors ns of parameterized input sizes.
Arguments:
- ns: Scale factor
n
for each axis (keyed by (tensor_id, axis_id)) that is parameterized assize = min + n * step
. - batch_size: The desired size of the batch dimension. If given batch_size overwrites any batch size present in max_input_shape. Default 1.
- max_input_shape: Limits the derived block shapes.
Each axis for which the input size, parameterized by
n
, is larger than max_input_shape is set to the minimal valuen_min
for which this is still true. Use this for small input samples or large values of ns. Or simply whenever you know the full input shape.
Returns:
Resolved axis sizes for model inputs and outputs.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
30class NotebookDescr(GenericDescrBase, title="bioimage.io notebook specification"): 31 """Bioimage.io description of a Jupyter notebook.""" 32 33 type: Literal["notebook"] = "notebook" 34 35 id: Optional[NotebookId] = None 36 """bioimage.io-wide unique resource identifier 37 assigned by bioimage.io; version **un**specific.""" 38 39 parent: Optional[NotebookId] = None 40 """The description from which this one is derived""" 41 42 source: NotebookSource 43 """The Jupyter notebook"""
Bioimage.io description of a Jupyter notebook.
bioimage.io-wide unique resource identifier assigned by bioimage.io; version unspecific.
The description from which this one is derived
The Jupyter notebook
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
121def save_bioimageio_package_as_folder( 122 source: Union[BioimageioYamlSource, ResourceDescr], 123 /, 124 *, 125 output_path: Union[NewPath, DirectoryPath, None] = None, 126 weights_priority_order: Optional[ # model only 127 Sequence[ 128 Literal[ 129 "keras_hdf5", 130 "onnx", 131 "pytorch_state_dict", 132 "tensorflow_js", 133 "tensorflow_saved_model_bundle", 134 "torchscript", 135 ] 136 ] 137 ] = None, 138) -> DirectoryPath: 139 """Write the content of a bioimage.io resource package to a folder. 140 141 Args: 142 source: bioimageio resource description 143 output_path: file path to write package to 144 weights_priority_order: If given only the first weights format present in the model is included. 145 If none of the prioritized weights formats is found all are included. 146 147 Returns: 148 directory path to bioimageio package folder 149 """ 150 package_content = _prepare_resource_package( 151 source, 152 weights_priority_order=weights_priority_order, 153 ) 154 if output_path is None: 155 output_path = Path(mkdtemp()) 156 else: 157 output_path = Path(output_path) 158 159 output_path.mkdir(exist_ok=True, parents=True) 160 for name, src in package_content.items(): 161 if isinstance(src, collections.abc.Mapping): 162 write_yaml(cast(YamlValue, src), output_path / name) 163 elif isinstance(src, ZipPath): 164 extracted = Path(src.root.extract(src.name, output_path)) 165 if extracted.name != src.name: 166 try: 167 shutil.move(str(extracted), output_path / src.name) 168 except Exception as e: 169 raise RuntimeError( 170 f"Failed to rename extracted file '{extracted.name}'" 171 + f" to '{src.name}'." 172 + f" (extracted from '{src.name}' in '{src.root.filename}')" 173 ) from e 174 else: 175 shutil.copy(src, output_path / name) 176 177 return output_path
Write the content of a bioimage.io resource package to a folder.
Arguments:
- source: bioimageio resource description
- output_path: file path to write package to
- weights_priority_order: If given only the first weights format present in the model is included. If none of the prioritized weights formats is found all are included.
Returns:
directory path to bioimageio package folder
241def save_bioimageio_package_to_stream( 242 source: Union[BioimageioYamlSource, ResourceDescr], 243 /, 244 *, 245 compression: int = ZIP_DEFLATED, 246 compression_level: int = 1, 247 output_stream: Union[IO[bytes], None] = None, 248 weights_priority_order: Optional[ # model only 249 Sequence[ 250 Literal[ 251 "keras_hdf5", 252 "onnx", 253 "pytorch_state_dict", 254 "tensorflow_js", 255 "tensorflow_saved_model_bundle", 256 "torchscript", 257 ] 258 ] 259 ] = None, 260) -> IO[bytes]: 261 """Package a bioimageio resource into a stream. 262 263 Args: 264 rd: bioimageio resource description 265 compression: The numeric constant of compression method. 266 compression_level: Compression level to use when writing files to the archive. 267 See https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile 268 output_stream: stream to write package to 269 weights_priority_order: If given only the first weights format present in the model is included. 270 If none of the prioritized weights formats is found all are included. 271 272 Note: this function bypasses safety checks and does not load/validate the model after writing. 273 274 Returns: 275 stream of zipped bioimageio package 276 """ 277 if output_stream is None: 278 output_stream = BytesIO() 279 280 package_content = _prepare_resource_package( 281 source, 282 weights_priority_order=weights_priority_order, 283 ) 284 285 write_zip( 286 output_stream, 287 package_content, 288 compression=compression, 289 compression_level=compression_level, 290 ) 291 292 return output_stream
Package a bioimageio resource into a stream.
Arguments:
- rd: bioimageio resource description
- compression: The numeric constant of compression method.
- compression_level: Compression level to use when writing files to the archive. See https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile
- output_stream: stream to write package to
- weights_priority_order: If given only the first weights format present in the model is included. If none of the prioritized weights formats is found all are included.
Note: this function bypasses safety checks and does not load/validate the model after writing.
Returns:
stream of zipped bioimageio package
180def save_bioimageio_package( 181 source: Union[BioimageioYamlSource, ResourceDescr], 182 /, 183 *, 184 compression: int = ZIP_DEFLATED, 185 compression_level: int = 1, 186 output_path: Union[NewPath, FilePath, None] = None, 187 weights_priority_order: Optional[ # model only 188 Sequence[ 189 Literal[ 190 "keras_hdf5", 191 "onnx", 192 "pytorch_state_dict", 193 "tensorflow_js", 194 "tensorflow_saved_model_bundle", 195 "torchscript", 196 ] 197 ] 198 ] = None, 199) -> FilePath: 200 """Package a bioimageio resource as a zip file. 201 202 Args: 203 rd: bioimageio resource description 204 compression: The numeric constant of compression method. 205 compression_level: Compression level to use when writing files to the archive. 206 See https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile 207 output_path: file path to write package to 208 weights_priority_order: If given only the first weights format present in the model is included. 209 If none of the prioritized weights formats is found all are included. 210 211 Returns: 212 path to zipped bioimageio package 213 """ 214 package_content = _prepare_resource_package( 215 source, 216 weights_priority_order=weights_priority_order, 217 ) 218 if output_path is None: 219 output_path = Path( 220 NamedTemporaryFile(suffix=".bioimageio.zip", delete=False).name 221 ) 222 else: 223 output_path = Path(output_path) 224 225 write_zip( 226 output_path, 227 package_content, 228 compression=compression, 229 compression_level=compression_level, 230 ) 231 with validation_context_var.get().replace(warning_level=ERROR): 232 if isinstance((exported := load_description(output_path)), InvalidDescr): 233 raise ValueError( 234 f"Exported package '{output_path}' is invalid:" 235 + f" {exported.validation_summary}" 236 ) 237 238 return output_path
Package a bioimageio resource as a zip file.
Arguments:
- rd: bioimageio resource description
- compression: The numeric constant of compression method.
- compression_level: Compression level to use when writing files to the archive. See https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile
- output_path: file path to write package to
- weights_priority_order: If given only the first weights format present in the model is included. If none of the prioritized weights formats is found all are included.
Returns:
path to zipped bioimageio package
118def save_bioimageio_yaml_only( 119 rd: Union[ResourceDescr, BioimageioYamlContent, InvalidDescr], 120 /, 121 file: Union[NewPath, FilePath, TextIO], 122): 123 """write the metadata of a resource description (`rd`) to `file` 124 without writing any of the referenced files in it. 125 126 Note: To save a resource description with its associated files as a package, 127 use `save_bioimageio_package` or `save_bioimageio_package_as_folder`. 128 """ 129 if isinstance(rd, ResourceDescrBase): 130 content = dump_description(rd) 131 else: 132 content = rd 133 134 write_yaml(cast(YamlValue, content), file)
write the metadata of a resource description (rd
) to file
without writing any of the referenced files in it.
Note: To save a resource description with its associated files as a package,
use save_bioimageio_package
or save_bioimageio_package_as_folder
.
161def validate_format( 162 data: BioimageioYamlContent, 163 /, 164 *, 165 format_version: Union[Literal["discover", "latest"], str] = DISCOVER, 166 context: Optional[ValidationContext] = None, 167) -> ValidationSummary: 168 """validate a bioimageio.yaml file (RDF)""" 169 with context or validation_context_var.get(): 170 rd = build_description(data, format_version=format_version) 171 172 assert rd.validation_summary is not None 173 return rd.validation_summary
validate a bioimageio.yaml file (RDF)
19@dataclass(frozen=True) 20class ValidationContext: 21 _context_tokens: "List[Token[ValidationContext]]" = field( 22 init=False, default_factory=list 23 ) 24 25 root: Union[RootHttpUrl, AbsoluteDirectory, ZipFile] = Path() 26 """url/directory serving as base to resolve any relative file paths""" 27 28 warning_level: WarningLevel = 50 29 """raise warnings of severity `s` as validation errors if `s >= warning_level`""" 30 31 log_warnings: bool = settings.log_warnings 32 """if `True` log warnings that are not raised to the console""" 33 34 file_name: Optional[FileName] = None 35 """file name of the bioimageio Yaml file""" 36 37 perform_io_checks: bool = settings.perform_io_checks 38 """wether or not to perform validation that requires file io, 39 e.g. downloading a remote files. 40 41 Existence of local absolute file paths is still being checked.""" 42 43 known_files: Dict[str, Sha256] = field(default_factory=dict) 44 """allows to bypass download and hashing of referenced files""" 45 46 def replace( 47 self, 48 root: Optional[Union[RootHttpUrl, DirectoryPath, ZipFile]] = None, 49 warning_level: Optional[WarningLevel] = None, 50 log_warnings: Optional[bool] = None, 51 file_name: Optional[str] = None, 52 perform_io_checks: Optional[bool] = None, 53 known_files: Optional[Dict[str, Sha256]] = None, 54 ) -> "ValidationContext": 55 if known_files is None and root is not None and self.root != root: 56 # reset known files if root changes, but no new known_files are given 57 known_files = {} 58 59 return ValidationContext( 60 root=self.root if root is None else root, 61 warning_level=( 62 self.warning_level if warning_level is None else warning_level 63 ), 64 log_warnings=self.log_warnings if log_warnings is None else log_warnings, 65 file_name=self.file_name if file_name is None else file_name, 66 perform_io_checks=( 67 self.perform_io_checks 68 if perform_io_checks is None 69 else perform_io_checks 70 ), 71 known_files=self.known_files if known_files is None else known_files, 72 ) 73 74 def __enter__(self): 75 self._context_tokens.append(validation_context_var.set(self)) 76 return self 77 78 def __exit__(self, type, value, traceback): # type: ignore 79 validation_context_var.reset(self._context_tokens.pop(-1)) 80 81 @property 82 def source_name(self) -> str: 83 if self.file_name is None: 84 return "in-memory" 85 else: 86 try: 87 if isinstance(self.root, Path): 88 source = (self.root / self.file_name).absolute() 89 else: 90 parsed = urlsplit(str(self.root)) 91 path = list(parsed.path.strip("/").split("/")) + [self.file_name] 92 source = urlunsplit( 93 ( 94 parsed.scheme, 95 parsed.netloc, 96 "/".join(path), 97 parsed.query, 98 parsed.fragment, 99 ) 100 ) 101 except ValueError: 102 return self.file_name 103 else: 104 return str(source)
url/directory serving as base to resolve any relative file paths
raise warnings of severity s
as validation errors if s >= warning_level
wether or not to perform validation that requires file io, e.g. downloading a remote files.
Existence of local absolute file paths is still being checked.
allows to bypass download and hashing of referenced files
46 def replace( 47 self, 48 root: Optional[Union[RootHttpUrl, DirectoryPath, ZipFile]] = None, 49 warning_level: Optional[WarningLevel] = None, 50 log_warnings: Optional[bool] = None, 51 file_name: Optional[str] = None, 52 perform_io_checks: Optional[bool] = None, 53 known_files: Optional[Dict[str, Sha256]] = None, 54 ) -> "ValidationContext": 55 if known_files is None and root is not None and self.root != root: 56 # reset known files if root changes, but no new known_files are given 57 known_files = {} 58 59 return ValidationContext( 60 root=self.root if root is None else root, 61 warning_level=( 62 self.warning_level if warning_level is None else warning_level 63 ), 64 log_warnings=self.log_warnings if log_warnings is None else log_warnings, 65 file_name=self.file_name if file_name is None else file_name, 66 perform_io_checks=( 67 self.perform_io_checks 68 if perform_io_checks is None 69 else perform_io_checks 70 ), 71 known_files=self.known_files if known_files is None else known_files, 72 )
81 @property 82 def source_name(self) -> str: 83 if self.file_name is None: 84 return "in-memory" 85 else: 86 try: 87 if isinstance(self.root, Path): 88 source = (self.root / self.file_name).absolute() 89 else: 90 parsed = urlsplit(str(self.root)) 91 path = list(parsed.path.strip("/").split("/")) + [self.file_name] 92 source = urlunsplit( 93 ( 94 parsed.scheme, 95 parsed.netloc, 96 "/".join(path), 97 parsed.query, 98 parsed.fragment, 99 ) 100 ) 101 except ValueError: 102 return self.file_name 103 else: 104 return str(source)
239class ValidationSummary(BaseModel, extra="allow"): 240 """Summarizes output of all bioimageio validations and tests 241 for one specific `ResourceDescr` instance.""" 242 243 name: str 244 source_name: str 245 type: str 246 format_version: str 247 status: Literal["passed", "failed"] 248 details: List[ValidationDetail] 249 env: Set[InstalledPackage] = Field( 250 default_factory=lambda: { 251 InstalledPackage(name="bioimageio.spec", version=VERSION) 252 } 253 ) 254 """list of selected, relevant package versions""" 255 256 conda_list: Optional[Sequence[InstalledPackage]] = None 257 """parsed output of conda list""" 258 259 @property 260 def status_icon(self): 261 if self.status == "passed": 262 return "✔️" 263 else: 264 return "❌" 265 266 @property 267 def errors(self) -> List[ErrorEntry]: 268 return list(chain.from_iterable(d.errors for d in self.details)) 269 270 @property 271 def warnings(self) -> List[WarningEntry]: 272 return list(chain.from_iterable(d.warnings for d in self.details)) 273 274 def __str__(self): 275 return f"{self.__class__.__name__}:\n" + self.format() 276 277 @staticmethod 278 def _format_md_table(rows: List[List[str]]) -> str: 279 """format `rows` as markdown table""" 280 n_cols = len(rows[0]) 281 assert all(len(row) == n_cols for row in rows) 282 col_widths = [max(max(len(row[i]) for row in rows), 3) for i in range(n_cols)] 283 284 # fix new lines in table cell 285 rows = [[line.replace("\n", "<br>") for line in r] for r in rows] 286 287 lines = [" | ".join(rows[0][i].center(col_widths[i]) for i in range(n_cols))] 288 lines.append(" | ".join("---".center(col_widths[i]) for i in range(n_cols))) 289 lines.extend( 290 [ 291 " | ".join(row[i].ljust(col_widths[i]) for i in range(n_cols)) 292 for row in rows[1:] 293 ] 294 ) 295 return "\n| " + " |\n| ".join(lines) + " |\n" 296 297 def format( 298 self, 299 hide_tracebacks: bool = False, 300 hide_source: bool = False, 301 hide_env: bool = False, 302 root_loc: Loc = (), 303 ) -> str: 304 """Format summary as Markdown string 305 306 Suitable to embed in HTML using '<br>' instead of '\n'. 307 """ 308 info = self._format_md_table( 309 [[self.status_icon, f"{self.name.strip('.').strip()} {self.status}"]] 310 + ([] if hide_source else [["source", self.source_name]]) 311 + [ 312 ["format version", f"{self.type} {self.format_version}"], 313 ] 314 + ([] if hide_env else [[e.name, e.version] for e in self.env]) 315 ) 316 317 def format_loc(loc: Loc): 318 return "`" + (".".join(map(str, root_loc + loc)) or ".") + "`" 319 320 details = [["❓", "location", "detail"]] 321 for d in self.details: 322 details.append([d.status_icon, format_loc(d.loc), d.name]) 323 if d.context is not None: 324 details.append( 325 [ 326 "🔍", 327 "context.perform_io_checks", 328 str(d.context["perform_io_checks"]), 329 ] 330 ) 331 if d.context["perform_io_checks"]: 332 details.append(["🔍", "context.root", d.context["root"]]) 333 for kfn, sha in d.context["known_files"].items(): 334 details.append(["🔍", f"context.known_files.{kfn}", sha]) 335 336 details.append( 337 ["🔍", "context.warning_level", d.context["warning_level"]] 338 ) 339 340 if d.recommended_env is not None: 341 rec_env = StringIO() 342 json_env = d.recommended_env.model_dump( 343 mode="json", exclude_defaults=True 344 ) 345 assert is_yaml_value(json_env) 346 write_yaml(json_env, rec_env) 347 rec_env_code = rec_env.getvalue().replace("\n", "</code><br><code>") 348 details.append( 349 [ 350 "🐍", 351 format_loc(d.loc), 352 f"recommended conda env ({d.name})<br>" 353 + f"<pre><code>{rec_env_code}</code></pre>", 354 ] 355 ) 356 357 if d.conda_compare: 358 details.append( 359 [ 360 "🐍", 361 format_loc(d.loc), 362 "conda compare ({d.name}):<br>" 363 + d.conda_compare.replace("\n", "<br>"), 364 ] 365 ) 366 367 for entry in d.errors: 368 details.append( 369 [ 370 "❌", 371 format_loc(entry.loc), 372 entry.msg.replace("\n\n", "<br>").replace("\n", "<br>"), 373 ] 374 ) 375 if hide_tracebacks: 376 continue 377 378 formatted_tb_lines: List[str] = [] 379 for tb in entry.traceback: 380 if not (tb_stripped := tb.strip()): 381 continue 382 383 first_tb_line, *tb_lines = tb_stripped.split("\n") 384 if ( 385 first_tb_line.startswith('File "') 386 and '", line' in first_tb_line 387 ): 388 path, where = first_tb_line[len('File "') :].split('", line') 389 try: 390 p = Path(path) 391 except Exception: 392 file_name = path 393 else: 394 path = p.as_posix() 395 file_name = p.name 396 397 where = ", line" + where 398 first_tb_line = f'[{file_name}]({file_name} "{path}"){where}' 399 400 if tb_lines: 401 tb_rest = "<br>`" + "`<br>`".join(tb_lines) + "`" 402 else: 403 tb_rest = "" 404 405 formatted_tb_lines.append(first_tb_line + tb_rest) 406 407 details.append(["", "", "<br>".join(formatted_tb_lines)]) 408 409 for entry in d.warnings: 410 details.append(["⚠", format_loc(entry.loc), entry.msg]) 411 412 return f"{info}{self._format_md_table(details)}" 413 414 # TODO: fix bug which casuses extensive white space between the info table and details table 415 @no_type_check 416 def display(self) -> None: 417 formatted = self.format() 418 try: 419 from IPython.core.getipython import get_ipython 420 from IPython.display import Markdown, display 421 except ImportError: 422 pass 423 else: 424 if get_ipython() is not None: 425 _ = display(Markdown(formatted)) 426 return 427 428 rich_markdown = rich.markdown.Markdown(formatted) 429 console = rich.console.Console() 430 console.print(rich_markdown) 431 432 def add_detail(self, detail: ValidationDetail): 433 if detail.status == "failed": 434 self.status = "failed" 435 elif detail.status != "passed": 436 assert_never(detail.status) 437 438 self.details.append(detail) 439 440 @field_validator("env", mode="before") 441 def _convert_dict(cls, value: List[Union[List[str], Dict[str, str]]]): 442 """convert old env value for backwards compatibility""" 443 if isinstance(value, list): 444 return [ 445 ( 446 (v["name"], v["version"], v.get("build", ""), v.get("channel", "")) 447 if isinstance(v, dict) and "name" in v and "version" in v 448 else v 449 ) 450 for v in value 451 ] 452 else: 453 return value
Summarizes output of all bioimageio validations and tests
for one specific ResourceDescr
instance.
parsed output of conda list
297 def format( 298 self, 299 hide_tracebacks: bool = False, 300 hide_source: bool = False, 301 hide_env: bool = False, 302 root_loc: Loc = (), 303 ) -> str: 304 """Format summary as Markdown string 305 306 Suitable to embed in HTML using '<br>' instead of '\n'. 307 """ 308 info = self._format_md_table( 309 [[self.status_icon, f"{self.name.strip('.').strip()} {self.status}"]] 310 + ([] if hide_source else [["source", self.source_name]]) 311 + [ 312 ["format version", f"{self.type} {self.format_version}"], 313 ] 314 + ([] if hide_env else [[e.name, e.version] for e in self.env]) 315 ) 316 317 def format_loc(loc: Loc): 318 return "`" + (".".join(map(str, root_loc + loc)) or ".") + "`" 319 320 details = [["❓", "location", "detail"]] 321 for d in self.details: 322 details.append([d.status_icon, format_loc(d.loc), d.name]) 323 if d.context is not None: 324 details.append( 325 [ 326 "🔍", 327 "context.perform_io_checks", 328 str(d.context["perform_io_checks"]), 329 ] 330 ) 331 if d.context["perform_io_checks"]: 332 details.append(["🔍", "context.root", d.context["root"]]) 333 for kfn, sha in d.context["known_files"].items(): 334 details.append(["🔍", f"context.known_files.{kfn}", sha]) 335 336 details.append( 337 ["🔍", "context.warning_level", d.context["warning_level"]] 338 ) 339 340 if d.recommended_env is not None: 341 rec_env = StringIO() 342 json_env = d.recommended_env.model_dump( 343 mode="json", exclude_defaults=True 344 ) 345 assert is_yaml_value(json_env) 346 write_yaml(json_env, rec_env) 347 rec_env_code = rec_env.getvalue().replace("\n", "</code><br><code>") 348 details.append( 349 [ 350 "🐍", 351 format_loc(d.loc), 352 f"recommended conda env ({d.name})<br>" 353 + f"<pre><code>{rec_env_code}</code></pre>", 354 ] 355 ) 356 357 if d.conda_compare: 358 details.append( 359 [ 360 "🐍", 361 format_loc(d.loc), 362 "conda compare ({d.name}):<br>" 363 + d.conda_compare.replace("\n", "<br>"), 364 ] 365 ) 366 367 for entry in d.errors: 368 details.append( 369 [ 370 "❌", 371 format_loc(entry.loc), 372 entry.msg.replace("\n\n", "<br>").replace("\n", "<br>"), 373 ] 374 ) 375 if hide_tracebacks: 376 continue 377 378 formatted_tb_lines: List[str] = [] 379 for tb in entry.traceback: 380 if not (tb_stripped := tb.strip()): 381 continue 382 383 first_tb_line, *tb_lines = tb_stripped.split("\n") 384 if ( 385 first_tb_line.startswith('File "') 386 and '", line' in first_tb_line 387 ): 388 path, where = first_tb_line[len('File "') :].split('", line') 389 try: 390 p = Path(path) 391 except Exception: 392 file_name = path 393 else: 394 path = p.as_posix() 395 file_name = p.name 396 397 where = ", line" + where 398 first_tb_line = f'[{file_name}]({file_name} "{path}"){where}' 399 400 if tb_lines: 401 tb_rest = "<br>`" + "`<br>`".join(tb_lines) + "`" 402 else: 403 tb_rest = "" 404 405 formatted_tb_lines.append(first_tb_line + tb_rest) 406 407 details.append(["", "", "<br>".join(formatted_tb_lines)]) 408 409 for entry in d.warnings: 410 details.append(["⚠", format_loc(entry.loc), entry.msg]) 411 412 return f"{info}{self._format_md_table(details)}"
Format summary as Markdown string
Suitable to embed in HTML using '<br>' instead of '
'.
415 @no_type_check 416 def display(self) -> None: 417 formatted = self.format() 418 try: 419 from IPython.core.getipython import get_ipython 420 from IPython.display import Markdown, display 421 except ImportError: 422 pass 423 else: 424 if get_ipython() is not None: 425 _ = display(Markdown(formatted)) 426 return 427 428 rich_markdown = rich.markdown.Markdown(formatted) 429 console = rich.console.Console() 430 console.print(rich_markdown)