Coverage for src/backoffice/index.py: 0%
114 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-12 10:26 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-12 10:26 +0000
1"""Data models and functions for indexing the bioimage.io collection"""
3import hashlib
4import json
5import shutil
6from collections import defaultdict
7from datetime import datetime
8from pathlib import Path
9from typing import Optional, Sequence
11try:
12 import httpx
13 from loguru import logger
14 from pydantic import BaseModel, Field
16 from backoffice._settings import settings
17 from backoffice.compatibility import InitialSummary
18 from backoffice.utils import (
19 get_summary,
20 get_summary_file_path,
21 yaml,
22 )
23except ImportError as e:
24 raise ImportError(
25 "Missing dependencies. "
26 "Please install `backoffice[dev]` to use backoffice.index."
27 ) from e
30from .utils_pure import get_report_path
33class Node(BaseModel, frozen=True, extra="ignore"):
34 pass
37class ResponseItemVersion(Node, frozen=True):
38 version: str
39 comment: Optional[str]
40 created_at: datetime
43class IndexItemVersion(Node, frozen=True):
44 version: str
45 comment: Optional[str]
46 created_at: datetime
47 source: str
48 sha256: str
51class ResponseItem(Node, frozen=True):
52 id: str
53 versions: Sequence[ResponseItemVersion]
54 type: str
57class IndexItem(Node, frozen=True):
58 id: str
59 versions: Sequence[IndexItemVersion]
60 type: str
63class Response(Node, frozen=True):
64 """Response from Hypha list endpoint"""
66 items: list[ResponseItem]
67 total: int
68 offset: int
69 limit: int
72class Index(Node, frozen=True):
73 items: list[IndexItem]
74 total: int
75 count_per_type: dict[str, int]
76 timestamp: datetime = Field(default_factory=datetime.now)
79def load_index(path: Path = Path("index.json")) -> Index:
80 logger.info("loading index from {}", path)
81 return Index.model_validate_json(path.read_text(encoding="utf-8"))
84def create_index() -> Index:
85 """Index the bioimage.io collection"""
87 index_path = Path("index.json")
88 if index_path.exists():
89 index = load_index(index_path)
90 else:
91 url = f"{settings.hypha_base_url}/public/services/artifact-manager/list"
93 def request(offset: int) -> Response:
94 r = httpx.get(
95 url,
96 params=dict(
97 parent_id="bioimage-io/bioimage.io",
98 offset=offset,
99 pagination=True,
100 limit=10000,
101 ),
102 headers=settings.get_hypha_headers(),
103 timeout=settings.http_timeout,
104 )
105 try:
106 _ = r.raise_for_status()
107 except Exception as e:
108 logger.error(r.json())
109 raise e
110 else:
111 return Response.model_validate_json(r.content)
113 items: list[ResponseItem] = []
114 for page in range(100):
115 response = request(len(items))
116 logger.info("Page {}: {} entries", page, len(response.items))
117 items.extend(response.items)
118 if response.total <= len(items):
119 if response.total != len(items):
120 logger.error(
121 "response.total {} != len(items) {}", response.total, len(items)
122 )
123 break
125 index_items: list[IndexItem] = []
126 for item in items:
127 domain, item_id_wo_domain = item.id.split("/", 1)
128 versions: list[IndexItemVersion] = []
129 for v in item.versions:
130 url = f"{settings.hypha_base_url}/{domain}/artifacts/{item_id_wo_domain}/files/rdf.yaml?version={v.version}"
131 sha256 = _initialize_report_directory(item, v, url)
132 versions.append(
133 IndexItemVersion(
134 version=v.version,
135 comment=v.comment,
136 created_at=v.created_at,
137 source=url,
138 sha256=sha256,
139 )
140 )
141 index_items.append(IndexItem(id=item.id, versions=versions, type=item.type))
143 count_per_type = defaultdict[str, int](int)
144 for item in index_items:
145 count_per_type[item.type] += 1
147 index = Index(
148 items=index_items,
149 total=len(index_items),
150 count_per_type=dict(count_per_type),
151 )
153 json_dict = index.model_dump(mode="json")
154 with index_path.open("wt", encoding="utf-8") as f:
155 json.dump(json_dict, f, indent=4, sort_keys=True, ensure_ascii=False)
156 # TODO: use .model_dump_json once it supports 'sort_keys' argument for a potential speed gain
157 # _ = index_path.write_text(index.model_dump_json(indent=4), encoding="utf-8")
159 logger.info("saved index to {}", index_path)
161 logger.info(
162 "loaded index with {} ids and {} versions",
163 len(index.items),
164 sum(len(item.versions) for item in index.items),
165 )
166 return index
169def _initialize_report_directory(
170 item: ResponseItem, v: ResponseItemVersion, url: str
171) -> str:
172 """Initialize the report directory for an item version.
174 Returns sha256 of the rdf.yaml file."""
175 report_path = get_report_path(item.id, v.version)
176 r = httpx.get(url, follow_redirects=True, timeout=settings.http_timeout)
177 _ = r.raise_for_status()
178 data = r.content
179 sha256 = hashlib.sha256(data).hexdigest()
181 summary = get_summary(item.id, v.version)
182 existing_sha256 = summary.rdf_yaml_sha256
183 if existing_sha256 == sha256:
184 logger.info(
185 "Found existing summary for {}/{} with matching RDF SHA-256: {}",
186 item.id,
187 v.version,
188 sha256,
189 )
190 return sha256
191 else:
192 if existing_sha256:
193 logger.warning(
194 "Found existing summary for {}/{} with different RDF SHA-256: {} != {}. deleting and replacing...",
195 item.id,
196 v.version,
197 existing_sha256,
198 sha256,
199 )
200 if report_path.exists():
201 shutil.rmtree(report_path)
203 report_path.mkdir(parents=True, exist_ok=True)
204 summary = InitialSummary(
205 rdf_content=yaml.load(data),
206 rdf_yaml_sha256=sha256,
207 status="untested",
208 )
209 summary_path = get_summary_file_path(item.id, v.version)
210 _ = summary_path.write_text(summary.model_dump_json(indent=4), encoding="utf-8")
211 logger.info("Initialized report directory {}", report_path)
212 return sha256
215if __name__ == "__main__":
216 _ = create_index()