|
5 | 5 | import os |
6 | 6 | import re |
7 | 7 | import smtplib |
| 8 | +import time |
8 | 9 | from collections import defaultdict |
9 | 10 | from datetime import datetime, timedelta, timezone |
10 | 11 | from email.mime.multipart import MIMEMultipart |
11 | 12 | from email.mime.text import MIMEText |
12 | | -from typing import Union |
| 13 | +from typing import Optional, Set, Union |
13 | 14 | from urllib.parse import urljoin |
14 | 15 | from uuid import UUID |
15 | 16 |
|
|
18 | 19 | import sansjson |
19 | 20 | from bs4 import BeautifulSoup |
20 | 21 | from jsonschema.exceptions import ValidationError |
21 | | -from requests.adapters import HTTPAdapter |
22 | | -from urllib3.util.retry import Retry |
23 | 22 |
|
24 | 23 | logging.basicConfig(level=logging.INFO) |
25 | 24 | logger = logging.getLogger() |
@@ -461,25 +460,111 @@ def get_server_type(server: str) -> str: |
461 | 460 | return server |
462 | 461 |
|
463 | 462 |
|
464 | | -def create_retry_session() -> requests.Session: |
465 | | - """Creates a requests session with retry logic for HTTP/S requests.""" |
466 | | - session = requests.Session() |
467 | | - # 3 Retries with exponential backoff (of 2 seconds) for server errors |
468 | | - # we want to retry on 500, 502, 503, and 504 errors (CKAN Down) |
469 | | - retry_strategy = Retry( |
470 | | - total=3, |
471 | | - backoff_factor=3, |
472 | | - backoff_max=15, |
473 | | - status_forcelist=[500, 502, 503, 504], |
474 | | - raise_on_status=False, |
475 | | - allowed_methods={"GET", "POST", "PUT", "DELETE"}, |
476 | | - ) |
477 | | - # pass retry strategy to HTTPAdapter |
478 | | - adapter = HTTPAdapter(max_retries=retry_strategy) |
479 | | - # tell session which protocols to use the adapter with |
480 | | - session.mount("http://", adapter) |
481 | | - session.mount("https://", adapter) |
482 | | - return session |
| 463 | +class RetrySession(requests.Session): |
| 464 | + """ |
| 465 | + Session made to handle more advanced logging and adds retry logic. |
| 466 | + """ |
| 467 | + |
| 468 | + def __init__( |
| 469 | + self, |
| 470 | + status_forcelist: Optional[Set[int]] = None, |
| 471 | + max_retries: int = 3, |
| 472 | + backoff_factor: float = 4.0, |
| 473 | + ): |
| 474 | + """ |
| 475 | + Initialize the RetrySession. |
| 476 | +
|
| 477 | + status_forcelist: Set of HTTP status codes to retry on. |
| 478 | + Defaults to {404, 499, 500, 502} |
| 479 | + max_retries: Maximum number of retry attempts |
| 480 | + backoff_factor: Factor for exponential backoff delay |
| 481 | + """ |
| 482 | + super().__init__() |
| 483 | + |
| 484 | + self.status_forcelist = status_forcelist or {404, 499, 500, 502} |
| 485 | + self.max_retries = max_retries |
| 486 | + self.backoff_factor = backoff_factor |
| 487 | + |
| 488 | + def request(self, method: str, url: str, **kwargs) -> requests.Response: |
| 489 | + """ |
| 490 | + Override request method to add logging and manual retry logic. |
| 491 | +
|
| 492 | + method: HTTP method |
| 493 | + url: Request URL |
| 494 | + **kwargs: Additional arguments passed to parent request method ( |
| 495 | + needed since we override it) |
| 496 | + """ |
| 497 | + last_response = None |
| 498 | + last_exception = None |
| 499 | + for attempt in range(self.max_retries + 1): |
| 500 | + try: |
| 501 | + if attempt == 0: |
| 502 | + logger.info(f"Making initial {method.upper()} request to {url}") |
| 503 | + |
| 504 | + response = super().request(method, url, **kwargs) |
| 505 | + # If status code should trigger retry and we have attempts left |
| 506 | + if ( |
| 507 | + response.status_code in self.status_forcelist |
| 508 | + and attempt < self.max_retries |
| 509 | + ): |
| 510 | + if attempt == 0: |
| 511 | + logger.warning( |
| 512 | + f"Received status code {response.status_code} for " |
| 513 | + f"{method.upper()} {url}. Retrying..." |
| 514 | + ) |
| 515 | + else: |
| 516 | + logger.warning( |
| 517 | + f"Attempt {attempt}: Received status code " |
| 518 | + f"{response.status_code} for {method.upper()} {url}. " |
| 519 | + f"Retrying..." |
| 520 | + ) |
| 521 | + last_response = response |
| 522 | + # Calculate backoff delay |
| 523 | + # for the 3, 7, 15 countdown |
| 524 | + delay = (self.backoff_factor * (2**attempt)) - 1 |
| 525 | + time.sleep(delay) |
| 526 | + continue |
| 527 | + |
| 528 | + # Success or no more retries |
| 529 | + if response.status_code in self.status_forcelist: |
| 530 | + logger.error( |
| 531 | + f"Final attempt: Still received status code " |
| 532 | + f"{response.status_code} for {method.upper()} {url}" |
| 533 | + ) |
| 534 | + |
| 535 | + return response |
| 536 | + |
| 537 | + except requests.exceptions.RequestException as e: |
| 538 | + logger.error( |
| 539 | + f"Attempt {attempt + 1}: Request failed for " |
| 540 | + f"{method.upper()} {url}: {str(e)}" |
| 541 | + ) |
| 542 | + last_exception = e |
| 543 | + |
| 544 | + if attempt < self.max_retries: |
| 545 | + delay = self.backoff_factor * (2**attempt) |
| 546 | + time.sleep(delay) |
| 547 | + continue |
| 548 | + else: |
| 549 | + raise |
| 550 | + |
| 551 | + # This should not be reached, but just in case |
| 552 | + if last_exception: |
| 553 | + raise last_exception |
| 554 | + |
| 555 | + return last_response |
| 556 | + |
| 557 | + |
| 558 | +def create_retry_session() -> RetrySession: |
| 559 | + """ |
| 560 | + Creates our desired RetrySession with default settings. |
| 561 | + """ |
| 562 | + # Use the HARVEST_RETRY_ON_ERROR env var to determine if we should retry |
| 563 | + retry_enabled = os.getenv("HARVEST_RETRY_ON_ERROR", "true").lower() == "true" |
| 564 | + if retry_enabled: |
| 565 | + return RetrySession(max_retries=3, backoff_factor=4.0) |
| 566 | + else: |
| 567 | + return RetrySession(max_retries=0, backoff_factor=0) |
483 | 568 |
|
484 | 569 |
|
485 | 570 | def send_email_to_recipients(recipients, subject, body): |
|
0 commit comments