Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ if response.my_field is None:

### Accessing raw response data (e.g. headers)

The "raw" Response object can be accessed by prefixing `.with_raw_response.` to any HTTP method call.
The "raw" Response object can be accessed by prefixing `.with_raw_response.` to any HTTP method call, e.g.,

```py
from dataherald import Dataherald
Expand All @@ -206,6 +206,24 @@ print(database_connection.id)

These methods return an [`APIResponse`](https://github.com/Dataherald/dataherald-python/tree/main/src/dataherald/_response.py) object.

The async client returns an [`AsyncAPIResponse`](https://github.com/Dataherald/dataherald-python/tree/main/src/dataherald/_response.py) with the same structure, the only difference being `await`able methods for reading the response content.

#### `.with_streaming_response`

The above interface eagerly reads the full response body when you make the request, which may not always be what you want.

To stream the response body, use `.with_streaming_response` instead, which requires a context manager and only reads the response body once you call `.read()`, `.text()`, `.json()`, `.iter_bytes()`, `.iter_text()`, `.iter_lines()` or `.parse()`. In the async client, these are async methods.

```python
with client.database_connections.with_streaming_response.create() as response:
print(response.headers.get("X-My-Header"))

for line in response.iter_lines():
print(line)
```

The context manager is required so that the response will reliably be closed.

### Configuring the HTTP client

You can directly override the [httpx client](https://www.python-httpx.org/api/#client) to customize it for your use case, including:
Expand Down
1 change: 1 addition & 0 deletions src/dataherald/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AsyncDataherald,
)
from ._version import __title__, __version__
from ._response import APIResponse as APIResponse, AsyncAPIResponse as AsyncAPIResponse
from ._exceptions import (
APIError,
ConflictError,
Expand Down
252 changes: 118 additions & 134 deletions src/dataherald/_base_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import os
import json
import time
import uuid
Expand Down Expand Up @@ -31,7 +30,7 @@
overload,
)
from functools import lru_cache
from typing_extensions import Literal, override
from typing_extensions import Literal, override, get_origin

import anyio
import httpx
Expand Down Expand Up @@ -61,18 +60,22 @@
AsyncTransport,
RequestOptions,
ModelBuilderProtocol,
BinaryResponseContent,
)
from ._utils import is_dict, is_given, is_mapping
from ._compat import model_copy, model_dump
from ._models import GenericModel, FinalRequestOptions, validate_type, construct_type
from ._response import APIResponse
from ._response import (
APIResponse,
BaseAPIResponse,
AsyncAPIResponse,
extract_response_type,
)
from ._constants import (
DEFAULT_LIMITS,
DEFAULT_TIMEOUT,
DEFAULT_MAX_RETRIES,
RAW_RESPONSE_HEADER,
STREAMED_RAW_RESPONSE_HEADER,
OVERRIDE_CAST_TO_HEADER,
)
from ._streaming import Stream, AsyncStream
from ._exceptions import (
Expand Down Expand Up @@ -493,28 +496,25 @@ def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, o
serialized[key] = value
return serialized

def _process_response(
self,
*,
cast_to: Type[ResponseT],
options: FinalRequestOptions,
response: httpx.Response,
stream: bool,
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
) -> ResponseT:
api_response = APIResponse(
raw=response,
client=self,
cast_to=cast_to,
stream=stream,
stream_cls=stream_cls,
options=options,
)
def _maybe_override_cast_to(self, cast_to: type[ResponseT], options: FinalRequestOptions) -> type[ResponseT]:
if not is_given(options.headers):
return cast_to

if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
return cast(ResponseT, api_response)
# make a copy of the headers so we don't mutate user-input
headers = dict(options.headers)

return api_response.parse()
# we internally support defining a temporary header to override the
# default `cast_to` type for use with `.with_raw_response` and `.with_streaming_response`
# see _response.py for implementation details
override_cast_to = headers.pop(OVERRIDE_CAST_TO_HEADER, NOT_GIVEN)
if is_given(override_cast_to):
options.headers = headers
return cast(Type[ResponseT], override_cast_to)

return cast_to

def _should_stream_response_body(self, request: httpx.Request) -> bool:
return request.headers.get(RAW_RESPONSE_HEADER) == "stream" # type: ignore[no-any-return]

def _process_response_data(
self,
Expand All @@ -540,12 +540,6 @@ def _process_response_data(
except pydantic.ValidationError as err:
raise APIResponseValidationError(response=response, body=data) from err

def _should_stream_response_body(self, *, request: httpx.Request) -> bool:
if request.headers.get(STREAMED_RAW_RESPONSE_HEADER) == "true":
return True

return False

@property
def qs(self) -> Querystring:
return Querystring()
Expand Down Expand Up @@ -610,6 +604,8 @@ def _calculate_retry_timeout(
if response_headers is not None:
retry_header = response_headers.get("retry-after")
try:
# note: the spec indicates that this should only ever be an integer
# but if someone sends a float there's no reason for us to not respect it
retry_after = float(retry_header)
except Exception:
retry_date_tuple = email.utils.parsedate_tz(retry_header)
Expand Down Expand Up @@ -873,6 +869,7 @@ def _request(
stream: bool,
stream_cls: type[_StreamT] | None,
) -> ResponseT | _StreamT:
cast_to = self._maybe_override_cast_to(cast_to, options)
self._prepare_options(options)

retries = self._remaining_retries(remaining_retries, options)
Expand Down Expand Up @@ -987,6 +984,50 @@ def _retry_request(
stream_cls=stream_cls,
)

def _process_response(
self,
*,
cast_to: Type[ResponseT],
options: FinalRequestOptions,
response: httpx.Response,
stream: bool,
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
) -> ResponseT:
origin = get_origin(cast_to) or cast_to

if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse):
if not issubclass(origin, APIResponse):
raise TypeError(f"API Response types must subclass {APIResponse}; Received {origin}")

response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
return cast(
ResponseT,
response_cls(
raw=response,
client=self,
cast_to=extract_response_type(response_cls),
stream=stream,
stream_cls=stream_cls,
options=options,
),
)

if cast_to == httpx.Response:
return cast(ResponseT, response)

api_response = APIResponse(
raw=response,
client=self,
cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
stream=stream,
stream_cls=stream_cls,
options=options,
)
if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
return cast(ResponseT, api_response)

return api_response.parse()

def _request_api_list(
self,
model: Type[object],
Expand Down Expand Up @@ -1353,6 +1394,7 @@ async def _request(
stream_cls: type[_AsyncStreamT] | None,
remaining_retries: int | None,
) -> ResponseT | _AsyncStreamT:
cast_to = self._maybe_override_cast_to(cast_to, options)
await self._prepare_options(options)

retries = self._remaining_retries(remaining_retries, options)
Expand Down Expand Up @@ -1428,7 +1470,7 @@ async def _request(
log.debug("Re-raising status error")
raise self._make_status_error_from_response(err.response) from None

return self._process_response(
return await self._process_response(
cast_to=cast_to,
options=options,
response=response,
Expand Down Expand Up @@ -1465,6 +1507,50 @@ async def _retry_request(
stream_cls=stream_cls,
)

async def _process_response(
self,
*,
cast_to: Type[ResponseT],
options: FinalRequestOptions,
response: httpx.Response,
stream: bool,
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
) -> ResponseT:
origin = get_origin(cast_to) or cast_to

if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse):
if not issubclass(origin, AsyncAPIResponse):
raise TypeError(f"API Response types must subclass {AsyncAPIResponse}; Received {origin}")

response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
return cast(
"ResponseT",
response_cls(
raw=response,
client=self,
cast_to=extract_response_type(response_cls),
stream=stream,
stream_cls=stream_cls,
options=options,
),
)

if cast_to == httpx.Response:
return cast(ResponseT, response)

api_response = AsyncAPIResponse(
raw=response,
client=self,
cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
stream=stream,
stream_cls=stream_cls,
options=options,
)
if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
return cast(ResponseT, api_response)

return await api_response.parse()

def _request_api_list(
self,
model: Type[_T],
Expand Down Expand Up @@ -1783,105 +1869,3 @@ def _merge_mappings(
"""
merged = {**obj1, **obj2}
return {key: value for key, value in merged.items() if not isinstance(value, Omit)}


class HttpxBinaryResponseContent(BinaryResponseContent):
response: httpx.Response

def __init__(self, response: httpx.Response) -> None:
self.response = response

@property
@override
def content(self) -> bytes:
return self.response.content

@property
@override
def text(self) -> str:
return self.response.text

@property
@override
def encoding(self) -> Optional[str]:
return self.response.encoding

@property
@override
def charset_encoding(self) -> Optional[str]:
return self.response.charset_encoding

@override
def json(self, **kwargs: Any) -> Any:
return self.response.json(**kwargs)

@override
def read(self) -> bytes:
return self.response.read()

@override
def iter_bytes(self, chunk_size: Optional[int] = None) -> Iterator[bytes]:
return self.response.iter_bytes(chunk_size)

@override
def iter_text(self, chunk_size: Optional[int] = None) -> Iterator[str]:
return self.response.iter_text(chunk_size)

@override
def iter_lines(self) -> Iterator[str]:
return self.response.iter_lines()

@override
def iter_raw(self, chunk_size: Optional[int] = None) -> Iterator[bytes]:
return self.response.iter_raw(chunk_size)

@override
def stream_to_file(
self,
file: str | os.PathLike[str],
*,
chunk_size: int | None = None,
) -> None:
with open(file, mode="wb") as f:
for data in self.response.iter_bytes(chunk_size):
f.write(data)

@override
def close(self) -> None:
return self.response.close()

@override
async def aread(self) -> bytes:
return await self.response.aread()

@override
async def aiter_bytes(self, chunk_size: Optional[int] = None) -> AsyncIterator[bytes]:
return self.response.aiter_bytes(chunk_size)

@override
async def aiter_text(self, chunk_size: Optional[int] = None) -> AsyncIterator[str]:
return self.response.aiter_text(chunk_size)

@override
async def aiter_lines(self) -> AsyncIterator[str]:
return self.response.aiter_lines()

@override
async def aiter_raw(self, chunk_size: Optional[int] = None) -> AsyncIterator[bytes]:
return self.response.aiter_raw(chunk_size)

@override
async def astream_to_file(
self,
file: str | os.PathLike[str],
*,
chunk_size: int | None = None,
) -> None:
path = anyio.Path(file)
async with await path.open(mode="wb") as f:
async for data in self.response.aiter_bytes(chunk_size):
await f.write(data)

@override
async def aclose(self) -> None:
return await self.response.aclose()
Loading