Coverage for bioimageio/spec/_internal/url.py: 96%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-18 12:47 +0000

1from contextlib import nullcontext 

2from typing import Any, ClassVar, Optional, Type, Union 

3 

4import httpx 

5import pydantic 

6from loguru import logger 

7from pydantic import RootModel 

8from typing_extensions import Literal, assert_never 

9 

10from . import warning_levels 

11from .field_warning import issue_warning 

12from .root_url import RootHttpUrl 

13from .validation_context import get_validation_context 

14 

15 

16def _validate_url(url: Union[str, pydantic.HttpUrl]) -> pydantic.HttpUrl: 

17 return _validate_url_impl(url, request_mode="head") 

18 

19 

20_KNOWN_VALID_URLS = ("https://zenodo.org/records/3446812/files/unet2d_weights.torch",) 

21"""known valid urls to bypass validation for to avoid sporadic 503 errors in tests etc.""" 

22 

23 

24def _validate_url_impl( 

25 url: Union[str, pydantic.HttpUrl], 

26 request_mode: Literal["head", "get_stream", "get"], 

27 timeout: int = 3, 

28) -> pydantic.HttpUrl: 

29 

30 url = str(url) 

31 context = get_validation_context() 

32 if url in context.known_files: 

33 return pydantic.HttpUrl(url) 

34 

35 val_url = url 

36 

37 if ( 

38 url.startswith("http://example.com") 

39 or url.startswith("https://example.com") 

40 or url in _KNOWN_VALID_URLS 

41 ): 

42 return pydantic.HttpUrl(url) 

43 

44 if url.startswith("https://colab.research.google.com/github/"): 

45 # get requests for colab returns 200 even if the source notebook does not exists. 

46 # We therefore validate the url to the notebbok instead (for github notebooks) 

47 val_url = url.replace( 

48 "https://colab.research.google.com/github/", "https://github.com/" 

49 ) 

50 elif url.startswith("https://colab.research.google.com/"): 

51 # TODO: improve validation of non-github colab urls 

52 issue_warning( 

53 "colab urls currently pass even if the notebook url was not found. Cannot fully validate {value}", 

54 value=url, 

55 severity=warning_levels.INFO, 

56 ) 

57 

58 try: 

59 if request_mode in ("head", "get"): 

60 request_ctxt = nullcontext( 

61 httpx.request(request_mode.upper(), val_url, timeout=timeout) 

62 ) 

63 elif request_mode == "get_stream": 

64 request_ctxt = httpx.stream("GET", val_url, timeout=timeout) 

65 else: 

66 assert_never(request_mode) 

67 

68 with request_ctxt as r: 

69 status_code = r.status_code 

70 reason = r.reason_phrase 

71 location = r.headers.get("location") 

72 

73 except ( 

74 httpx.InvalidURL, 

75 httpx.TooManyRedirects, 

76 ) as e: 

77 raise ValueError(f"Invalid URL '{url}': {e}") 

78 except httpx.RequestError as e: 

79 issue_warning( 

80 "Failed to validate URL '{value}': {error}\nrequest: {request}", 

81 value=url, 

82 msg_context={"error": str(e), "request": e.request}, 

83 ) 

84 except Exception as e: 

85 issue_warning( 

86 "Failed to validate URL '{value}': {error}", 

87 value=url, 

88 msg_context={"error": str(e)}, 

89 ) 

90 else: 

91 if status_code == 200: # ok 

92 pass 

93 elif status_code in (302, 303): # found 

94 pass 

95 elif status_code in (301, 308): 

96 issue_warning( 

97 "URL redirected ({status_code}): consider updating {value} with new" 

98 + " location: {location}", 

99 value=url, 

100 severity=warning_levels.INFO, 

101 msg_context={ 

102 "status_code": status_code, 

103 "location": location, 

104 }, 

105 ) 

106 elif request_mode == "head": 

107 return _validate_url_impl(url, request_mode="get_stream", timeout=timeout) 

108 elif request_mode == "get_stream": 

109 return _validate_url_impl(url, request_mode="get", timeout=timeout) 

110 elif request_mode == "get": 

111 issue_warning( 

112 "{status_code}: {reason} ({value})", 

113 value=url, 

114 severity=( 

115 warning_levels.INFO 

116 if status_code == 405 # may be returned due to a captcha 

117 else warning_levels.WARNING 

118 ), 

119 msg_context={ 

120 "status_code": status_code, 

121 "reason": reason, 

122 }, 

123 ) 

124 else: 

125 assert_never(request_mode) 

126 

127 context.known_files[url] = None 

128 return pydantic.HttpUrl(url) 

129 

130 

131class HttpUrl(RootHttpUrl): 

132 """A URL with the HTTP or HTTPS scheme.""" 

133 

134 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl] 

135 _exists: Optional[bool] = None 

136 

137 def _after_validator(self): 

138 self = super()._after_validator() 

139 context = get_validation_context() 

140 if context.perform_io_checks: 

141 _ = self.exists() 

142 

143 return self 

144 

145 def exists(self): 

146 """True if URL is available""" 

147 if self._exists is None: 

148 ctxt = get_validation_context() 

149 try: 

150 with ctxt.replace(warning_level=warning_levels.WARNING): 

151 self._validated = _validate_url(self._validated) 

152 except Exception as e: 

153 if ctxt.log_warnings: 

154 logger.info(e) 

155 

156 self._exists = False 

157 else: 

158 self._exists = True 

159 

160 return self._exists