Coverage for src/backoffice/index.py: 0%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-12 10:26 +0000

1"""Data models and functions for indexing the bioimage.io collection""" 

2 

3import hashlib 

4import json 

5import shutil 

6from collections import defaultdict 

7from datetime import datetime 

8from pathlib import Path 

9from typing import Optional, Sequence 

10 

11try: 

12 import httpx 

13 from loguru import logger 

14 from pydantic import BaseModel, Field 

15 

16 from backoffice._settings import settings 

17 from backoffice.compatibility import InitialSummary 

18 from backoffice.utils import ( 

19 get_summary, 

20 get_summary_file_path, 

21 yaml, 

22 ) 

23except ImportError as e: 

24 raise ImportError( 

25 "Missing dependencies. " 

26 "Please install `backoffice[dev]` to use backoffice.index." 

27 ) from e 

28 

29 

30from .utils_pure import get_report_path 

31 

32 

33class Node(BaseModel, frozen=True, extra="ignore"): 

34 pass 

35 

36 

37class ResponseItemVersion(Node, frozen=True): 

38 version: str 

39 comment: Optional[str] 

40 created_at: datetime 

41 

42 

43class IndexItemVersion(Node, frozen=True): 

44 version: str 

45 comment: Optional[str] 

46 created_at: datetime 

47 source: str 

48 sha256: str 

49 

50 

51class ResponseItem(Node, frozen=True): 

52 id: str 

53 versions: Sequence[ResponseItemVersion] 

54 type: str 

55 

56 

57class IndexItem(Node, frozen=True): 

58 id: str 

59 versions: Sequence[IndexItemVersion] 

60 type: str 

61 

62 

63class Response(Node, frozen=True): 

64 """Response from Hypha list endpoint""" 

65 

66 items: list[ResponseItem] 

67 total: int 

68 offset: int 

69 limit: int 

70 

71 

72class Index(Node, frozen=True): 

73 items: list[IndexItem] 

74 total: int 

75 count_per_type: dict[str, int] 

76 timestamp: datetime = Field(default_factory=datetime.now) 

77 

78 

79def load_index(path: Path = Path("index.json")) -> Index: 

80 logger.info("loading index from {}", path) 

81 return Index.model_validate_json(path.read_text(encoding="utf-8")) 

82 

83 

84def create_index() -> Index: 

85 """Index the bioimage.io collection""" 

86 

87 index_path = Path("index.json") 

88 if index_path.exists(): 

89 index = load_index(index_path) 

90 else: 

91 url = f"{settings.hypha_base_url}/public/services/artifact-manager/list" 

92 

93 def request(offset: int) -> Response: 

94 r = httpx.get( 

95 url, 

96 params=dict( 

97 parent_id="bioimage-io/bioimage.io", 

98 offset=offset, 

99 pagination=True, 

100 limit=10000, 

101 ), 

102 headers=settings.get_hypha_headers(), 

103 timeout=settings.http_timeout, 

104 ) 

105 try: 

106 _ = r.raise_for_status() 

107 except Exception as e: 

108 logger.error(r.json()) 

109 raise e 

110 else: 

111 return Response.model_validate_json(r.content) 

112 

113 items: list[ResponseItem] = [] 

114 for page in range(100): 

115 response = request(len(items)) 

116 logger.info("Page {}: {} entries", page, len(response.items)) 

117 items.extend(response.items) 

118 if response.total <= len(items): 

119 if response.total != len(items): 

120 logger.error( 

121 "response.total {} != len(items) {}", response.total, len(items) 

122 ) 

123 break 

124 

125 index_items: list[IndexItem] = [] 

126 for item in items: 

127 domain, item_id_wo_domain = item.id.split("/", 1) 

128 versions: list[IndexItemVersion] = [] 

129 for v in item.versions: 

130 url = f"{settings.hypha_base_url}/{domain}/artifacts/{item_id_wo_domain}/files/rdf.yaml?version={v.version}" 

131 sha256 = _initialize_report_directory(item, v, url) 

132 versions.append( 

133 IndexItemVersion( 

134 version=v.version, 

135 comment=v.comment, 

136 created_at=v.created_at, 

137 source=url, 

138 sha256=sha256, 

139 ) 

140 ) 

141 index_items.append(IndexItem(id=item.id, versions=versions, type=item.type)) 

142 

143 count_per_type = defaultdict[str, int](int) 

144 for item in index_items: 

145 count_per_type[item.type] += 1 

146 

147 index = Index( 

148 items=index_items, 

149 total=len(index_items), 

150 count_per_type=dict(count_per_type), 

151 ) 

152 

153 json_dict = index.model_dump(mode="json") 

154 with index_path.open("wt", encoding="utf-8") as f: 

155 json.dump(json_dict, f, indent=4, sort_keys=True, ensure_ascii=False) 

156 # TODO: use .model_dump_json once it supports 'sort_keys' argument for a potential speed gain 

157 # _ = index_path.write_text(index.model_dump_json(indent=4), encoding="utf-8") 

158 

159 logger.info("saved index to {}", index_path) 

160 

161 logger.info( 

162 "loaded index with {} ids and {} versions", 

163 len(index.items), 

164 sum(len(item.versions) for item in index.items), 

165 ) 

166 return index 

167 

168 

169def _initialize_report_directory( 

170 item: ResponseItem, v: ResponseItemVersion, url: str 

171) -> str: 

172 """Initialize the report directory for an item version. 

173 

174 Returns sha256 of the rdf.yaml file.""" 

175 report_path = get_report_path(item.id, v.version) 

176 r = httpx.get(url, follow_redirects=True, timeout=settings.http_timeout) 

177 _ = r.raise_for_status() 

178 data = r.content 

179 sha256 = hashlib.sha256(data).hexdigest() 

180 

181 summary = get_summary(item.id, v.version) 

182 existing_sha256 = summary.rdf_yaml_sha256 

183 if existing_sha256 == sha256: 

184 logger.info( 

185 "Found existing summary for {}/{} with matching RDF SHA-256: {}", 

186 item.id, 

187 v.version, 

188 sha256, 

189 ) 

190 return sha256 

191 else: 

192 if existing_sha256: 

193 logger.warning( 

194 "Found existing summary for {}/{} with different RDF SHA-256: {} != {}. deleting and replacing...", 

195 item.id, 

196 v.version, 

197 existing_sha256, 

198 sha256, 

199 ) 

200 if report_path.exists(): 

201 shutil.rmtree(report_path) 

202 

203 report_path.mkdir(parents=True, exist_ok=True) 

204 summary = InitialSummary( 

205 rdf_content=yaml.load(data), 

206 rdf_yaml_sha256=sha256, 

207 status="untested", 

208 ) 

209 summary_path = get_summary_file_path(item.id, v.version) 

210 _ = summary_path.write_text(summary.model_dump_json(indent=4), encoding="utf-8") 

211 logger.info("Initialized report directory {}", report_path) 

212 return sha256 

213 

214 

215if __name__ == "__main__": 

216 _ = create_index()