File size: 4,556 Bytes
53ea588
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License

"""HTTP client utilities for making REST API calls."""

import asyncio
import json
import logging
from enum import Enum
from http import HTTPStatus
from typing import Any, Final

import aiohttp

logger = logging.getLogger(__name__)


class CallMethod(str, Enum):
    """Enumeration of supported HTTP methods.

    Attributes:
        POST: HTTP POST method.
        PUT: HTTP PUT method.
        GET: HTTP GET method.
    """

    POST = "post"
    PUT = "put"
    GET = "get"


DEFAULT_TIMEOUT: Final[aiohttp.ClientTimeout] = aiohttp.ClientTimeout(total=10, sock_connect=5)


class HttpClient:
    """HTTP client for making REST API calls."""

    def __init__(self) -> None:
        """Initialize the HTTP client."""
        self.session = aiohttp.ClientSession(timeout=DEFAULT_TIMEOUT)
        self.lock = asyncio.Lock()
        self.request_in_progress = False
        self.response_json: dict[str, Any] | None = None
        self.response_text: str | None = None

    async def close(self) -> None:
        """Close the HTTP client session.

        Ensures proper cleanup of resources by closing the aiohttp session.
        """
        async with self.lock:
            await self.session.close()

    async def delete(self, url: str, headers: dict[str, Any] = None):
        """Send an HTTP DELETE request.

        Args:
            url: The URL to send the DELETE request to.
            headers (optional): HTTP headers to include in the delete request

        Returns:
            bool: True if the request was successful (status code 200), False otherwise.
        """
        try:
            async with self.session.delete(url, headers=headers) as resp:
                return resp.status == HTTPStatus.OK
        except Exception as e:
            logger.warning(f"HttpClient: error deleting {url}: {e}")
            return False

    async def send_request(
        self,
        url: str,
        params: dict[str, Any],
        headers: dict[str, Any],
        payload: dict[str, Any],
        call_method: CallMethod,
        http_status_codes_to_ignore: set[HTTPStatus] | None = None,
    ) -> bool:
        """Send an HTTP request with the specified parameters.

        Args:
            url: The URL to send the request to.
            params: Query parameters to include in the request.
            headers: HTTP headers to include in the request.
            payload: The request body payload.
            call_method: The HTTP method to use (POST, PUT, GET).
            http_status_codes_to_ignore: Set of HTTP status codes to treat as success.

        Returns:
            bool: True if the request was successful, False otherwise.
        """
        async with self.lock:
            self.response_json = None
            self.response_text = None
            if self.request_in_progress:
                logger.error("Request already in progress")
            self.request_in_progress = True
            try:
                if http_status_codes_to_ignore is None:
                    http_status_codes_to_ignore = set()
                data = json.dumps(payload)
                logger.info(f"HttpClient: sending request to '{url}' params={params} data={data}")

                status_code: int = -1
                method = str(call_method.value)

                async with self.session.request(method, url, data=data, headers=headers, params=params) as resp:
                    status_code = resp.status
                    content_type = resp.headers.get("Content-Type", "")

                    if "application/json" in content_type:
                        try:
                            self.response_json = await resp.json()
                        except ValueError as exc:
                            logger.warning(f"HttpClient: error parsing JSON response: {exc}")
                    else:
                        self.response_text = await resp.text()

                if status_code != HTTPStatus.OK and status_code not in http_status_codes_to_ignore:
                    logger.warning(f"HttpClient: call to '{url}' failed with response '{self.response_text}'.")
                    return False
                else:
                    return True
            except Exception as e:
                logger.warning(f"Could not connect to API {e}")
                return False
            finally:
                self.request_in_progress = False