Coverage for src / bioimageio / spec / _internal / url.py: 91%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-23 10:51 +0000

1from contextlib import nullcontext 

2from pathlib import PurePosixPath 

3from typing import Any, ClassVar, Optional, Type, Union 

4 

5import httpx 

6import pydantic 

7from loguru import logger 

8from pydantic import RootModel 

9from typing_extensions import Literal, assert_never 

10 

11from . import warning_levels 

12from ._settings import settings 

13from .field_warning import issue_warning 

14from .root_url import RootHttpUrl 

15from .validation_context import get_validation_context 

16 

17 

18def _validate_url(url: Union[str, pydantic.HttpUrl]) -> pydantic.HttpUrl: 

19 return _validate_url_impl(url, request_mode="head", timeout=settings.http_timeout) 

20 

21 

22def _validate_url_impl( 

23 url: Union[str, pydantic.HttpUrl], 

24 request_mode: Literal["head", "get_stream", "get"], 

25 timeout: float, 

26) -> pydantic.HttpUrl: 

27 url = str(url) 

28 context = get_validation_context() 

29 if url in context.known_files: 

30 return pydantic.HttpUrl(url) 

31 

32 val_url = url 

33 

34 if url.startswith("http://example.com") or url.startswith("https://example.com"): 

35 return pydantic.HttpUrl(url) 

36 

37 if url.startswith("https://colab.research.google.com/github/"): 

38 # get requests for colab returns 200 even if the source notebook does not exists. 

39 # We therefore validate the url to the notebbok instead (for github notebooks) 

40 val_url = url.replace( 

41 "https://colab.research.google.com/github/", "https://github.com/" 

42 ) 

43 elif url.startswith("https://colab.research.google.com/"): 

44 # TODO: improve validation of non-github colab urls 

45 issue_warning( 

46 "colab urls currently pass even if the notebook url was not found. Cannot fully validate {value}", 

47 value=url, 

48 severity=warning_levels.INFO, 

49 ) 

50 

51 try: 

52 if request_mode in ("head", "get"): 

53 request_ctxt = nullcontext( 

54 httpx.request( 

55 request_mode.upper(), 

56 val_url, 

57 timeout=timeout, 

58 follow_redirects=True, 

59 ) 

60 ) 

61 elif request_mode == "get_stream": 

62 request_ctxt = httpx.stream( 

63 "GET", val_url, timeout=timeout, follow_redirects=True 

64 ) 

65 else: 

66 assert_never(request_mode) 

67 

68 with request_ctxt as r: 

69 status_code = r.status_code 

70 reason = r.reason_phrase 

71 location = r.headers.get("location") 

72 

73 except ( 

74 httpx.InvalidURL, 

75 httpx.TooManyRedirects, 

76 ) as e: 

77 raise ValueError(f"Invalid URL '{url}': {e}") 

78 except httpx.RequestError as e: 

79 issue_warning( 

80 "Failed to validate URL '{value}': {error}\nrequest: {request}", 

81 value=url, 

82 msg_context={"error": str(e), "request": e.request}, 

83 ) 

84 except Exception as e: 

85 issue_warning( 

86 "Failed to validate URL '{value}': {error}", 

87 value=url, 

88 msg_context={"error": str(e)}, 

89 ) 

90 else: 

91 if status_code == 200: # ok 

92 pass 

93 elif status_code in (302, 303): # found 

94 pass 

95 elif status_code in (301, 308): 

96 issue_warning( 

97 "URL redirected ({status_code}): consider updating {value} with new" 

98 + " location: {location}", 

99 value=url, 

100 severity=warning_levels.INFO, 

101 msg_context={ 

102 "status_code": status_code, 

103 "location": location, 

104 }, 

105 ) 

106 elif request_mode == "head": 

107 return _validate_url_impl(url, request_mode="get_stream", timeout=timeout) 

108 elif request_mode == "get_stream": 

109 return _validate_url_impl(url, request_mode="get", timeout=timeout) 

110 elif request_mode == "get": 

111 issue_warning( 

112 "{status_code}: {reason} ({value})", 

113 value=url, 

114 severity=( 

115 warning_levels.INFO 

116 if status_code == 405 # may be returned due to a captcha 

117 else warning_levels.WARNING 

118 ), 

119 msg_context={ 

120 "status_code": status_code, 

121 "reason": reason, 

122 }, 

123 ) 

124 else: 

125 assert_never(request_mode) 

126 

127 context.known_files[url] = None 

128 return pydantic.HttpUrl(url) 

129 

130 

131class HttpUrl(RootHttpUrl): 

132 """A URL with the HTTP or HTTPS scheme.""" 

133 

134 root_model: ClassVar[Type[RootModel[Any]]] = RootModel[pydantic.HttpUrl] 

135 _exists: Optional[bool] = None 

136 

137 def _after_validator(self): 

138 self = super()._after_validator() 

139 context = get_validation_context() 

140 if context.perform_io_checks: 

141 _ = self.exists() 

142 

143 return self 

144 

145 def exists(self): 

146 """True if URL is available""" 

147 if self._exists is None: 

148 ctxt = get_validation_context() 

149 try: 

150 with ctxt.replace(warning_level=warning_levels.WARNING): 

151 self._validated = _validate_url(self._validated) 

152 except Exception as e: 

153 if ctxt.log_warnings: 

154 logger.info(e) 

155 

156 self._exists = False 

157 else: 

158 self._exists = True 

159 

160 return self._exists 

161 

162 @property 

163 def suffix(self) -> str: 

164 if self.path is None: 

165 return "" 

166 else: 

167 return PurePosixPath(self.path).suffix