Coverage for bioimageio/spec/_internal/url.py: 96%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-12 17:44 +0000

1from contextlib import nullcontext 

2from typing import Any, ClassVar, Optional, Type, Union 

3 

4import httpx 

5import pydantic 

6from loguru import logger 

7from pydantic import RootModel 

8from typing_extensions import Literal, assert_never 

9 

10from . import warning_levels 

11from .field_warning import issue_warning 

12from .root_url import RootHttpUrl 

13from .validation_context import get_validation_context 

14 

15 

16def _validate_url(url: Union[str, pydantic.HttpUrl]) -> pydantic.HttpUrl: 

17 return _validate_url_impl(url, request_mode="head") 

18 

19 

20_KNOWN_VALID_URLS = ("https://zenodo.org/records/3446812/files/unet2d_weights.torch",) 

21"""known valid urls to bypass validation for to avoid sporadic 503 errors in tests etc.""" 

22 

23 

24def _validate_url_impl( 

25 url: Union[str, pydantic.HttpUrl], 

26 request_mode: Literal["head", "get_stream", "get"], 

27 timeout: int = 3, 

28) -> pydantic.HttpUrl: 

29 

30 url = str(url) 

31 context = get_validation_context() 

32 if url in context.known_files: 

33 return pydantic.HttpUrl(url) 

34 

35 val_url = url 

36 

37 if ( 

38 url.startswith("http://example.com") 

39 or url.startswith("https://example.com") 

40 or url in _KNOWN_VALID_URLS 

41 ): 

42 return pydantic.HttpUrl(url) 

43 

44 if url.startswith("https://colab.research.google.com/github/"): 

45 # get requests for colab returns 200 even if the source notebook does not exists. 

46 # We therefore validate the url to the notebbok instead (for github notebooks) 

47 val_url = url.replace( 

48 "https://colab.research.google.com/github/", "https://github.com/" 

49 ) 

50 elif url.startswith("https://colab.research.google.com/"): 

51 # TODO: improve validation of non-github colab urls 

52 issue_warning( 

53 "colab urls currently pass even if the notebook url was not found. Cannot fully validate {value}", 

54 value=url, 

55 severity=warning_levels.INFO, 

56 ) 

57 

58 try: 

59 if request_mode in ("head", "get"): 

60 request_ctxt = nullcontext( 

61 httpx.request( 

62 request_mode.upper(), 

63 val_url, 

64 timeout=timeout, 

65 follow_redirects=True, 

66 ) 

67 ) 

68 elif request_mode == "get_stream": 

69 request_ctxt = httpx.stream( 

70 "GET", val_url, timeout=timeout, follow_redirects=True 

71 ) 

72 else: 

73 assert_never(request_mode) 

74 

75 with request_ctxt as r: 

76 status_code = r.status_code 

77 reason = r.reason_phrase 

78 location = r.headers.get("location") 

79 

80 except ( 

81 httpx.InvalidURL, 

82 httpx.TooManyRedirects, 

83 ) as e: 

84 raise ValueError(f"Invalid URL '{url}': {e}") 

85 except httpx.RequestError as e: 

86 issue_warning( 

87 "Failed to validate URL '{value}': {error}\nrequest: {request}", 

88 value=url, 

89 msg_context={"error": str(e), "request": e.request}, 

90 ) 

91 except Exception as e: 

92 issue_warning( 

93 "Failed to validate URL '{value}': {error}", 

94 value=url, 

95 msg_context={"error": str(e)}, 

96 ) 

97 else: 

98 if status_code == 200: # ok 

99 pass 

100 elif status_code in (302, 303): # found 

101 pass 

102 elif status_code in (301, 308): 

103 issue_warning( 

104 "URL redirected ({status_code}): consider updating {value} with new" 

105 + " location: {location}", 

106 value=url, 

107 severity=warning_levels.INFO, 

108 msg_context={ 

109 "status_code": status_code, 

110 "location": location, 

111 }, 

112 ) 

113 elif request_mode == "head": 

114 return _validate_url_impl(url, request_mode="get_stream", timeout=timeout) 

115 elif request_mode == "get_stream": 

116 return _validate_url_impl(url, request_mode="get", timeout=timeout) 

117 elif request_mode == "get": 

118 issue_warning( 

119 "{status_code}: {reason} ({value})", 

120 value=url, 

121 severity=( 

122 warning_levels.INFO 

123 if status_code == 405 # may be returned due to a captcha 

124 else warning_levels.WARNING 

125 ), 

126 msg_context={ 

127 "status_code": status_code, 

128 "reason": reason, 

129 }, 

130 ) 

131 else: 

132 assert_never(request_mode) 

133 

134 context.known_files[url] = None 

135 return pydantic.HttpUrl(url) 

136 

137 

138class HttpUrl(RootHttpUrl): 

139 """A URL with the HTTP or HTTPS scheme.""" 

140 

141 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl] 

142 _exists: Optional[bool] = None 

143 

144 def _after_validator(self): 

145 self = super()._after_validator() 

146 context = get_validation_context() 

147 if context.perform_io_checks: 

148 _ = self.exists() 

149 

150 return self 

151 

152 def exists(self): 

153 """True if URL is available""" 

154 if self._exists is None: 

155 ctxt = get_validation_context() 

156 try: 

157 with ctxt.replace(warning_level=warning_levels.WARNING): 

158 self._validated = _validate_url(self._validated) 

159 except Exception as e: 

160 if ctxt.log_warnings: 

161 logger.info(e) 

162 

163 self._exists = False 

164 else: 

165 self._exists = True 

166 

167 return self._exists